innerring: Add GAS pouring mechanism for a configurable list of wallets #128
7 changed files with 83 additions and 1 deletions
|
@ -4,6 +4,7 @@ Changelog for FrostFS Node
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
- Add GAS pouring mechanism for a configurable list of wallets (#128)
|
||||||
- Separate batching for replicated operations over the same container in pilorama (#1621)
|
- Separate batching for replicated operations over the same container in pilorama (#1621)
|
||||||
- Doc for extended headers (#2128)
|
- Doc for extended headers (#2128)
|
||||||
- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116)
|
- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116)
|
||||||
|
|
|
@ -102,6 +102,7 @@ func defaultConfiguration(cfg *viper.Viper) {
|
||||||
cfg.SetDefault("emit.mint.threshold", 1)
|
cfg.SetDefault("emit.mint.threshold", 1)
|
||||||
cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8
|
cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8
|
||||||
cfg.SetDefault("emit.gas.balance_threshold", 0)
|
cfg.SetDefault("emit.gas.balance_threshold", 0)
|
||||||
|
cfg.SetDefault("emit.extra_wallets", nil)
|
||||||
|
|
||||||
cfg.SetDefault("audit.task.exec_pool_size", 10)
|
cfg.SetDefault("audit.task.exec_pool_size", 10)
|
||||||
cfg.SetDefault("audit.task.queue_capacity", 100)
|
cfg.SetDefault("audit.task.queue_capacity", 100)
|
||||||
|
|
|
@ -72,6 +72,10 @@ emit:
|
||||||
threshold: 1 # Lifetime of records in LRU cache of all deposit receivers in FrostFS epochs
|
threshold: 1 # Lifetime of records in LRU cache of all deposit receivers in FrostFS epochs
|
||||||
gas:
|
gas:
|
||||||
balance_threshold: 100000000000 # Fixed8 value of inner ring wallet balance threshold when GAS emission for deposit receivers is disabled; disabled by default
|
balance_threshold: 100000000000 # Fixed8 value of inner ring wallet balance threshold when GAS emission for deposit receivers is disabled; disabled by default
|
||||||
|
extra_wallets: # wallet addresses that are included in gas emission process in equal share with network map nodes
|
||||||
|
- "NQcfMqU6pfXFwaaBN6KHcTpT63eMtzk6eH"
|
||||||
|
- "NaSVC4xKySQBpKr1XRVYFCHjLhuYXnMBrP"
|
||||||
|
- "NT9jL5XcxcDt2iTj67o2d5xNfDxquN3pPk"
|
||||||
|
|
||||||
workers:
|
workers:
|
||||||
alphabet: 10 # Number of workers to process events from alphabet contract in parallel
|
alphabet: 10 # Number of workers to process events from alphabet contract in parallel
|
||||||
|
|
|
@ -49,7 +49,9 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
@ -803,8 +805,14 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// create alphabet processor
|
// create alphabet processor
|
||||||
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
||||||
|
ParsedWallets: parsedWallets,
|
||||||
Log: log,
|
Log: log,
|
||||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||||
AlphabetContracts: server.contracts.alphabet,
|
AlphabetContracts: server.contracts.alphabet,
|
||||||
|
@ -1034,6 +1042,24 @@ func ParsePublicKeysFromStrings(pubKeys []string) (keys.PublicKeys, error) {
|
||||||
return publicKeys, nil
|
return publicKeys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parseWalletAddressesFromStrings returns a slice of util.Uint160 from a slice
|
||||||
|
// of strings.
|
||||||
|
func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
|
||||||
|
if len(wallets) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
extraWallets := make([]util.Uint160, len(wallets))
|
||||||
|
for i := range wallets {
|
||||||
|
extraWallets[i], err = address.StringToUint160(wallets[i])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return extraWallets, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) initConfigFromBlockchain() error {
|
func (s *Server) initConfigFromBlockchain() error {
|
||||||
// get current epoch
|
// get current epoch
|
||||||
epoch, err := s.netmapClient.Epoch()
|
epoch, err := s.netmapClient.Epoch()
|
||||||
|
|
|
@ -57,7 +57,7 @@ func (ap *Processor) processEmit() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln))
|
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln+len(ap.parsedWallets)))
|
||||||
|
|
||||||
for i := range nmNodes {
|
for i := range nmNodes {
|
||||||
keyBytes := nmNodes[i].PublicKey()
|
keyBytes := nmNodes[i].PublicKey()
|
||||||
|
@ -79,4 +79,17 @@ func (ap *Processor) processEmit() {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode)
|
||||||
|
if err != nil {
|
||||||
|
receiversLog := make([]string, len(ap.parsedWallets))
|
||||||
|
for i, addr := range ap.parsedWallets {
|
||||||
|
receiversLog[i] = addr.StringLE()
|
||||||
|
}
|
||||||
|
ap.log.Warn("can't transfer gas to wallet",
|
||||||
|
zap.Strings("receivers", receiversLog),
|
||||||
|
zap.Int64("amount", int64(gasPerNode)),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ type (
|
||||||
|
|
||||||
// Processor of events produced for alphabet contracts in the sidechain.
|
// Processor of events produced for alphabet contracts in the sidechain.
|
||||||
Processor struct {
|
Processor struct {
|
||||||
|
parsedWallets []util.Uint160
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
alphabetContracts Contracts
|
alphabetContracts Contracts
|
||||||
|
@ -44,6 +45,7 @@ type (
|
||||||
|
|
||||||
// Params of the processor constructor.
|
// Params of the processor constructor.
|
||||||
Params struct {
|
Params struct {
|
||||||
|
ParsedWallets []util.Uint160
|
||||||
Log *logger.Logger
|
Log *logger.Logger
|
||||||
PoolSize int
|
PoolSize int
|
||||||
AlphabetContracts Contracts
|
AlphabetContracts Contracts
|
||||||
|
@ -73,6 +75,7 @@ func New(p *Params) (*Processor, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Processor{
|
return &Processor{
|
||||||
|
parsedWallets: p.ParsedWallets,
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
alphabetContracts: p.AlphabetContracts,
|
alphabetContracts: p.AlphabetContracts,
|
||||||
|
|
|
@ -244,6 +244,40 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) BatchTransferGas(receivers []util.Uint160, amount fixedn.Fixed8) error {
|
||||||
|
c.switchLock.RLock()
|
||||||
|
defer c.switchLock.RUnlock()
|
||||||
|
|
||||||
|
if c.inactive {
|
||||||
|
return ErrConnectionLost
|
||||||
|
}
|
||||||
|
|
||||||
|
transferParams := make([]nep17.TransferParameters, len(receivers))
|
||||||
|
receiversLog := make([]string, len(receivers))
|
||||||
|
|
||||||
|
for i, receiver := range receivers {
|
||||||
|
transferParams[i] = nep17.TransferParameters{
|
||||||
|
From: c.accAddr,
|
||||||
|
To: receiver,
|
||||||
|
Amount: big.NewInt(int64(amount)),
|
||||||
|
Data: nil,
|
||||||
|
}
|
||||||
|
receiversLog[i] = receiver.StringLE()
|
||||||
|
}
|
||||||
|
|
||||||
|
txHash, vub, err := c.gasToken.MultiTransfer(transferParams)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.logger.Debug("batch gas transfer invoke",
|
||||||
|
zap.Strings("to", receiversLog),
|
||||||
|
zap.Stringer("tx_hash", txHash.Reverse()),
|
||||||
|
zap.Uint32("vub", vub))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Wait function blocks routing execution until there
|
// Wait function blocks routing execution until there
|
||||||
// are `n` new blocks in the chain.
|
// are `n` new blocks in the chain.
|
||||||
//
|
//
|
||||||
|
|
Loading…
Reference in a new issue