innerring: Add GAS pouring mechanism for a configurable list of wallets #128

Merged
fyrchik merged 1 commit from ironbee/frostfs-node:OBJECT-1145_wallet-pouring-for-ir into master 2023-03-20 12:50:07 +00:00
7 changed files with 83 additions and 1 deletions

View file

@ -4,6 +4,7 @@ Changelog for FrostFS Node
## [Unreleased] ## [Unreleased]
### Added ### Added
- Add GAS pouring mechanism for a configurable list of wallets (#128)
- Separate batching for replicated operations over the same container in pilorama (#1621) - Separate batching for replicated operations over the same container in pilorama (#1621)
- Doc for extended headers (#2128) - Doc for extended headers (#2128)
- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116) - New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116)

View file

@ -102,6 +102,7 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("emit.mint.threshold", 1) cfg.SetDefault("emit.mint.threshold", 1)
cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8 cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8
cfg.SetDefault("emit.gas.balance_threshold", 0) cfg.SetDefault("emit.gas.balance_threshold", 0)
cfg.SetDefault("emit.extra_wallets", nil)
fyrchik marked this conversation as resolved Outdated

Isn't nil shorter and better?

Isn't `nil` shorter and better?
cfg.SetDefault("audit.task.exec_pool_size", 10) cfg.SetDefault("audit.task.exec_pool_size", 10)
cfg.SetDefault("audit.task.queue_capacity", 100) cfg.SetDefault("audit.task.queue_capacity", 100)

View file

@ -72,6 +72,10 @@ emit:
threshold: 1 # Lifetime of records in LRU cache of all deposit receivers in FrostFS epochs threshold: 1 # Lifetime of records in LRU cache of all deposit receivers in FrostFS epochs
gas: gas:
balance_threshold: 100000000000 # Fixed8 value of inner ring wallet balance threshold when GAS emission for deposit receivers is disabled; disabled by default balance_threshold: 100000000000 # Fixed8 value of inner ring wallet balance threshold when GAS emission for deposit receivers is disabled; disabled by default
extra_wallets: # wallet addresses that are included in gas emission process in equal share with network map nodes

Some comment will be appreciated. Like wallet addresses that are included in gas emission process in equal share with network map nodes.

Some comment will be appreciated. Like `wallet addresses that are included in gas emission process in equal share with network map nodes`.
- "NQcfMqU6pfXFwaaBN6KHcTpT63eMtzk6eH"
- "NaSVC4xKySQBpKr1XRVYFCHjLhuYXnMBrP"
- "NT9jL5XcxcDt2iTj67o2d5xNfDxquN3pPk"
workers: workers:
alphabet: 10 # Number of workers to process events from alphabet contract in parallel alphabet: 10 # Number of workers to process events from alphabet contract in parallel

View file

@ -49,7 +49,9 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -803,8 +805,14 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
} }
} }
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return nil, err
}

why do we have viper lib that deep? all the other usages are here in the New func (read values, store them in some var/field)?

why do we have `viper` lib that deep? all the other usages are here in the `New` func (read values, store them in some var/field)?

I guess it allows simpler SIGHUP reload implementation.

I guess it allows simpler SIGHUP reload implementation.

Let's consider SIGHUP in a separate task. Params structure should not accept viper -- otherwise we cannot handle any logical errors during config reread (we will just stop working until the config is fixed).

Let's consider SIGHUP in a separate task. `Params` structure should not accept viper -- otherwise we cannot handle any logical errors during config reread (we will just stop working until the config is fixed).
// create alphabet processor // create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{ alphabetProcessor, err := alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets,
Log: log, Log: log,
PoolSize: cfg.GetInt("workers.alphabet"), PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: server.contracts.alphabet, AlphabetContracts: server.contracts.alphabet,
@ -1034,6 +1042,24 @@ func ParsePublicKeysFromStrings(pubKeys []string) (keys.PublicKeys, error) {
return publicKeys, nil return publicKeys, nil
} }
// parseWalletAddressesFromStrings returns a slice of util.Uint160 from a slice
// of strings.
func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
if len(wallets) == 0 {
fyrchik marked this conversation as resolved Outdated

Purely aesthetic: wouldn't it be better to have if len(wallets) == 0 first, to have less indendation in the biggest part of a function.

Purely aesthetic: wouldn't it be better to have `if len(wallets) == 0` first, to have less indendation in the biggest part of a function.
return nil, nil
}
var err error
extraWallets := make([]util.Uint160, len(wallets))
for i := range wallets {
extraWallets[i], err = address.StringToUint160(wallets[i])
if err != nil {
return nil, err
}
}
return extraWallets, nil
}
func (s *Server) initConfigFromBlockchain() error { func (s *Server) initConfigFromBlockchain() error {
// get current epoch // get current epoch
epoch, err := s.netmapClient.Epoch() epoch, err := s.netmapClient.Epoch()

View file

@ -57,7 +57,7 @@ func (ap *Processor) processEmit() {
return return
} }
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln)) gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln+len(ap.parsedWallets)))
for i := range nmNodes { for i := range nmNodes {
keyBytes := nmNodes[i].PublicKey() keyBytes := nmNodes[i].PublicKey()
@ -79,4 +79,17 @@ func (ap *Processor) processEmit() {
) )
} }
} }
err = ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode)
fyrchik marked this conversation as resolved Outdated

It would be nice to transfer everything in one transaction. Is it possible with the current API?

It would be nice to transfer everything in one transaction. Is it possible with the current API?

I am not sure what you mean. Could you elaborate, please?

I am not sure what you mean. Could you elaborate, please?

TransferGas performs a contract call of the GAS contract and sends a transaction. However, there could be multiple contract calls in a single transaction -- it's just a byte-code after all. neo-go has a convenient API for this 8dc5b38568/pkg/rpcclient/nep17/nep17.go (L117)

The question was whether we could easily do the same here or it should be postponed because it requires some refactoring.

`TransferGas` performs a contract call of the GAS contract and sends a transaction. However, there could be multiple contract calls in a single transaction -- it's just a byte-code after all. neo-go has a convenient API for this https://github.com/nspcc-dev/neo-go/blob/8dc5b38568fe6de0346f1a2deb1f3235ad7ff524/pkg/rpcclient/nep17/nep17.go#L117 The question was whether we could easily do the same here or it should be postponed because it requires some refactoring.
if err != nil {

the same operation every processEmit call?

the same operation every `processEmit` call?

Agree, we should parse it after we have read the configuration and before providing it to a constructor. This way we also avoid logging the same error again and again.

Agree, we should parse it after we have read the configuration and before providing it to a constructor. This way we also avoid logging the same error again and again.

We can parse it once on start and on every SIGHUP later, I think.

We can parse it once on start and on every SIGHUP later, I think.

@carpawell @fyrchik if I add some client.BatchTransferGas method which uses MultiTransfer, it still needs a []string of addresses for logging purposes.

  1. TransferGas-like logging on debug level: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/morph/client/client.go#L239
  2. Error logging at process_emit.go: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/processors/alphabet/process_emit.go#L74

#1 happens every transfer, #2 happens only in case of an error. Both need a conversion code []Uint160 -> []string to log the data with zap.Strings().

The conversion should happen on both levels (process_emit.go and client.go) or we have to convert the data at process_emit.go and then pass it to new method client.BatchTransferGas so it could log the array.

Is this conversion acceptable? Does not look too well to me.

@carpawell @fyrchik if I add some `client.BatchTransferGas` method which uses MultiTransfer, it still needs a []string of addresses for logging purposes. 1) TransferGas-like logging on debug level: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/morph/client/client.go#L239 2) Error logging at process_emit.go: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/processors/alphabet/process_emit.go#L74 #1 happens every transfer, #2 happens only in case of an error. Both need a conversion code []Uint160 -> []string to log the data with `zap.Strings()`. The conversion should happen on both levels (process_emit.go and client.go) or we have to convert the data at process_emit.go and then pass it to new method `client.BatchTransferGas` so it could log the array. Is this conversion acceptable? Does not look too well to me.
receiversLog := make([]string, len(ap.parsedWallets))
for i, addr := range ap.parsedWallets {

Is there a reason why we use addresses in config and hex format here?

Is there a reason why we use addresses in config and hex format here?

Not sure if I understand the question fully.

Previously, my code transformed addresses as strings into Uint160 every emit cycle. Following the comments I changed it so that addresses get transformed into Uint160 once on init phase. However, now the logging requires transforming them into strings.

Not sure if I understand the question fully. Previously, my code transformed addresses as strings into Uint160 every emit cycle. Following the comments I changed it so that addresses get transformed into Uint160 once on init phase. However, now the logging requires transforming them into strings.

I mean using address.Uint160ToString from neo-go instead of hex-string.

I mean using `address.Uint160ToString` from neo-go instead of hex-string.
receiversLog[i] = addr.StringLE()
}
ap.log.Warn("can't transfer gas to wallet",
zap.Strings("receivers", receiversLog),
zap.Int64("amount", int64(gasPerNode)),
zap.String("error", err.Error()),
fyrchik marked this conversation as resolved Outdated

code duplication?

code duplication?

Nodes and wallets get processed separately. TransferGas is called in both batches.

Nodes and wallets get processed separately. TransferGas is called in both batches.

i meant that we can handle both netmap nodes and extra wallets (public keys parsing for nodes and parsing/just appending slice for extra wallets) separately and just have one sending for loop after that OR do that inside one transaction as said @fyrchik (if it is possible with the current API of course)

i meant that we can handle both netmap nodes and extra wallets (public keys parsing for nodes and parsing/just appending slice for extra wallets) separately and just have one sending `for` loop after that **OR** do that inside one transaction as said @fyrchik (if it is possible with the current API of course)
)
}
} }

View file

@ -33,6 +33,7 @@ type (
// Processor of events produced for alphabet contracts in the sidechain. // Processor of events produced for alphabet contracts in the sidechain.

such desc says nothing about what that wallets are used for. do we need an interface here? why not just []string or parsed wallet addresses (uint160)?

such desc says nothing about what that wallets are used for. do we need an interface here? why not just `[]string` or parsed wallet addresses (uint160)?
Processor struct { Processor struct {
parsedWallets []util.Uint160
log *logger.Logger log *logger.Logger
pool *ants.Pool pool *ants.Pool
alphabetContracts Contracts alphabetContracts Contracts
@ -44,6 +45,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
ParsedWallets []util.Uint160
Log *logger.Logger Log *logger.Logger
PoolSize int PoolSize int
AlphabetContracts Contracts AlphabetContracts Contracts
@ -73,6 +75,7 @@ func New(p *Params) (*Processor, error) {
} }
return &Processor{ return &Processor{
parsedWallets: p.ParsedWallets,
log: p.Log, log: p.Log,
pool: pool, pool: pool,
alphabetContracts: p.AlphabetContracts, alphabetContracts: p.AlphabetContracts,

View file

@ -244,6 +244,40 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error
return nil return nil
} }
func (c *Client) BatchTransferGas(receivers []util.Uint160, amount fixedn.Fixed8) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
transferParams := make([]nep17.TransferParameters, len(receivers))
receiversLog := make([]string, len(receivers))
for i, receiver := range receivers {
transferParams[i] = nep17.TransferParameters{
From: c.accAddr,
To: receiver,
Amount: big.NewInt(int64(amount)),
Data: nil,
}
receiversLog[i] = receiver.StringLE()
}
txHash, vub, err := c.gasToken.MultiTransfer(transferParams)
if err != nil {
return err
}
c.logger.Debug("batch gas transfer invoke",
zap.Strings("to", receiversLog),
zap.Stringer("tx_hash", txHash.Reverse()),
zap.Uint32("vub", vub))
return nil
}
// Wait function blocks routing execution until there // Wait function blocks routing execution until there
// are `n` new blocks in the chain. // are `n` new blocks in the chain.
// //