[#125] ir: Set extra wallets on SIGHUP

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2023-05-12 10:41:04 +03:00 committed by Evgenii Stratonikov
parent 0624820909
commit 6f47c75e43
7 changed files with 61 additions and 36 deletions

View file

@ -8,6 +8,7 @@ Changelog for FrostFS Node
- Change log level on SIGHUP for ir (#125) - Change log level on SIGHUP for ir (#125)
- Reload pprof and metrics on SIGHUP for ir (#125) - Reload pprof and metrics on SIGHUP for ir (#125)
- Support copies number parameter in `frostfs-cli object put` (#351) - Support copies number parameter in `frostfs-cli object put` (#351)
- Set extra wallets on SIGHUP for ir (#125)
### Changed ### Changed
- `frostfs-cli util locode generate` is now much faster (#309) - `frostfs-cli util locode generate` is now much faster (#309)

View file

@ -62,6 +62,11 @@ func watchForSignal(cancel func()) {
} }
pprofCmp.reload() pprofCmp.reload()
metricsCmp.reload() metricsCmp.reload()
log.Info(logs.FrostFSIRReloadExtraWallets)
err = innerRing.SetExtraWallets(cfg)
if err != nil {
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
}
log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
case syscall.SIGTERM, syscall.SIGINT: case syscall.SIGTERM, syscall.SIGINT:
log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping) log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)

View file

@ -414,6 +414,7 @@ const (
FrostFSIRApplicationStopped = "application stopped" // Info in ../node/cmd/frostfs-ir/main.go FrostFSIRApplicationStopped = "application stopped" // Info in ../node/cmd/frostfs-ir/main.go
FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint" // Debug in ../node/pkg/morph/client/constructor.go FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint" // Debug in ../node/pkg/morph/client/constructor.go
FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint" // Info in ../node/pkg/morph/client/constructor.go FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint" // Info in ../node/pkg/morph/client/constructor.go
FrostFSIRReloadExtraWallets = "reload extra wallets" // Info in ../node/cmd/frostfs-ir/config.go
FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file" // Error in ../node/cmd/frostfs-node/grpc.go FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file" // Error in ../node/cmd/frostfs-node/grpc.go
FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint" // Error in ../node/cmd/frostfs-node/grpc.go FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint" // Error in ../node/cmd/frostfs-node/grpc.go
FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint" // Info in ../node/cmd/frostfs-node/grpc.go FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint" // Info in ../node/cmd/frostfs-node/grpc.go

View file

@ -204,7 +204,7 @@ func (s *Server) createIRFetcher() irFetcher {
return irf return irf
} }
func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) { func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) {
s.epochTimer = newEpochTimer(&epochTimerArgs{ s.epochTimer = newEpochTimer(&epochTimerArgs{
l: s.log, l: s.log,
alphabetState: s, alphabetState: s,
@ -219,21 +219,21 @@ func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morp
// initialize emission timer // initialize emission timer
emissionTimer := newEmissionTimer(&emitTimerArgs{ emissionTimer := newEmissionTimer(&emitTimerArgs{
ap: processors.AlphabetProcessor, ap: s.alphabetProcessor,
emitDuration: cfg.GetUint32("timers.emit"), emitDuration: cfg.GetUint32("timers.emit"),
}) })
s.addBlockTimer(emissionTimer) s.addBlockTimer(emissionTimer)
} }
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) { func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil { if err != nil {
return nil, err return err
} }
// create alphabet processor // create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{ s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets, ParsedWallets: parsedWallets,
Log: s.log, Log: s.log,
PoolSize: cfg.GetInt("workers.alphabet"), PoolSize: cfg.GetInt("workers.alphabet"),
@ -244,15 +244,15 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, e
StorageEmission: cfg.GetUint64("emit.storage.amount"), StorageEmission: cfg.GetUint64("emit.storage.amount"),
}) })
if err != nil { if err != nil {
return nil, err return err
} }
err = bindMorphProcessor(alphabetProcessor, s) err = bindMorphProcessor(s.alphabetProcessor, s)
if err != nil { if err != nil {
return nil, err return err
} }
return alphabetProcessor, nil return nil
} }
func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client, func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client,
@ -425,13 +425,7 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
return result, nil return result, nil
} }
type serverProcessors struct { func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) error {
AlphabetProcessor *alphabet.Processor
}
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
result := &serverProcessors{}
irf := s.createIRFetcher() irf := s.createIRFetcher()
s.statusIndex = newInnerRingIndexer( s.statusIndex = newInnerRingIndexer(
@ -443,35 +437,35 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien
alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync) err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient) err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient) err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient) err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient)
if err != nil { if err != nil {
return nil, err return err
} }
result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg) err = s.initAlphabetProcessor(cfg)
if err != nil { if err != nil {
return nil, err return err
} }
return result, nil return nil
} }
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) { func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
@ -74,6 +75,7 @@ type (
// runtime processors // runtime processors
netmapProcessor *netmap.Processor netmapProcessor *netmap.Processor
alphabetProcessor *alphabet.Processor
workers []func(context.Context) workers []func(context.Context)
@ -383,13 +385,12 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err return nil, err
} }
var processors *serverProcessors err = server.initProcessors(cfg, morphClients)
processors, err = server.initProcessors(cfg, morphClients)
if err != nil { if err != nil {
return nil, err return nil, err
} }
server.initTimers(cfg, processors, morphClients) server.initTimers(cfg, morphClients)
err = server.initGRPCServer(cfg) err = server.initGRPCServer(cfg)
if err != nil { if err != nil {
@ -589,3 +590,12 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler {
return newEpochHandlers return newEpochHandlers
} }
func (s *Server) SetExtraWallets(cfg *viper.Viper) error {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return err
}
s.alphabetProcessor.SetParsedWallets(parsedWallets)
return nil
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -52,7 +53,10 @@ func (ap *Processor) processEmit() {
nmNodes := networkMap.Nodes() nmNodes := networkMap.Nodes()
nmLen := len(nmNodes) nmLen := len(nmNodes)
extraLen := len(ap.parsedWallets) ap.pwLock.RLock()
pw := ap.parsedWallets
ap.pwLock.RUnlock()
extraLen := len(pw)
ap.log.Debug(logs.AlphabetGasEmission, ap.log.Debug(logs.AlphabetGasEmission,
zap.Int("network_map", nmLen), zap.Int("network_map", nmLen),
@ -66,7 +70,7 @@ func (ap *Processor) processEmit() {
ap.transferGasToNetmapNodes(nmNodes, gasPerNode) ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
ap.transferGasToExtraNodes(extraLen, gasPerNode) ap.transferGasToExtraNodes(pw, gasPerNode)
} }
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) { func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
@ -92,12 +96,12 @@ func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerN
} }
} }
func (ap *Processor) transferGasToExtraNodes(extraLen int, gasPerNode fixedn.Fixed8) { func (ap *Processor) transferGasToExtraNodes(pw []util.Uint160, gasPerNode fixedn.Fixed8) {
if extraLen != 0 { if len(pw) > 0 {
err := ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode) err := ap.morphClient.BatchTransferGas(pw, gasPerNode)
if err != nil { if err != nil {
receiversLog := make([]string, extraLen) receiversLog := make([]string, len(pw))
for i, addr := range ap.parsedWallets { for i, addr := range pw {
receiversLog[i] = addr.StringLE() receiversLog[i] = addr.StringLE()
} }
ap.log.Warn(logs.AlphabetCantTransferGasToWallet, ap.log.Warn(logs.AlphabetCantTransferGasToWallet,

View file

@ -3,6 +3,7 @@ package alphabet
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -46,6 +47,8 @@ type (
// Processor of events produced for alphabet contracts in the sidechain. // Processor of events produced for alphabet contracts in the sidechain.
Processor struct { Processor struct {
parsedWallets []util.Uint160 parsedWallets []util.Uint160
// protects parsedWallets from concurrent change
pwLock *sync.RWMutex
log *logger.Logger log *logger.Logger
pool *ants.Pool pool *ants.Pool
alphabetContracts Contracts alphabetContracts Contracts
@ -88,6 +91,7 @@ func New(p *Params) (*Processor, error) {
return &Processor{ return &Processor{
parsedWallets: p.ParsedWallets, parsedWallets: p.ParsedWallets,
pwLock: new(sync.RWMutex),
log: p.Log, log: p.Log,
pool: pool, pool: pool,
alphabetContracts: p.AlphabetContracts, alphabetContracts: p.AlphabetContracts,
@ -98,6 +102,12 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
func (ap *Processor) SetParsedWallets(parsedWallets []util.Uint160) {
ap.pwLock.Lock()
ap.parsedWallets = parsedWallets
ap.pwLock.Unlock()
}
// ListenerNotificationParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
return nil return nil