From 6f47c75e434143d39058ceeae2a1f028e6633c0a Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Fri, 12 May 2023 10:41:04 +0300 Subject: [PATCH] [#125] ir: Set extra wallets on SIGHUP Signed-off-by: Anton Nikiforov --- CHANGELOG.md | 1 + cmd/frostfs-ir/config.go | 5 +++ internal/logs/logs.go | 1 + pkg/innerring/initialization.go | 42 ++++++++----------- pkg/innerring/innerring.go | 18 ++++++-- .../processors/alphabet/process_emit.go | 18 ++++---- .../processors/alphabet/processor.go | 12 +++++- 7 files changed, 61 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c05d065fe..97feb2245 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Changelog for FrostFS Node - Change log level on SIGHUP for ir (#125) - Reload pprof and metrics on SIGHUP for ir (#125) - Support copies number parameter in `frostfs-cli object put` (#351) +- Set extra wallets on SIGHUP for ir (#125) ### Changed - `frostfs-cli util locode generate` is now much faster (#309) diff --git a/cmd/frostfs-ir/config.go b/cmd/frostfs-ir/config.go index 2e2aa1613..54c7d18e3 100644 --- a/cmd/frostfs-ir/config.go +++ b/cmd/frostfs-ir/config.go @@ -62,6 +62,11 @@ func watchForSignal(cancel func()) { } pprofCmp.reload() metricsCmp.reload() + log.Info(logs.FrostFSIRReloadExtraWallets) + err = innerRing.SetExtraWallets(cfg) + if err != nil { + log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err)) + } log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) case syscall.SIGTERM, syscall.SIGINT: log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5cd1b8dba..742f6a8f7 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -414,6 +414,7 @@ const ( FrostFSIRApplicationStopped = "application stopped" // Info in ../node/cmd/frostfs-ir/main.go FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint" // Debug in ../node/pkg/morph/client/constructor.go FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint" // Info in ../node/pkg/morph/client/constructor.go + FrostFSIRReloadExtraWallets = "reload extra wallets" // Info in ../node/cmd/frostfs-ir/config.go FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file" // Error in ../node/cmd/frostfs-node/grpc.go FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint" // Error in ../node/cmd/frostfs-node/grpc.go FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint" // Info in ../node/cmd/frostfs-node/grpc.go diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go index 1dc1d40ea..89269d50d 100644 --- a/pkg/innerring/initialization.go +++ b/pkg/innerring/initialization.go @@ -204,7 +204,7 @@ func (s *Server) createIRFetcher() irFetcher { return irf } -func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) { +func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) { s.epochTimer = newEpochTimer(&epochTimerArgs{ l: s.log, alphabetState: s, @@ -219,21 +219,21 @@ func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morp // initialize emission timer emissionTimer := newEmissionTimer(&emitTimerArgs{ - ap: processors.AlphabetProcessor, + ap: s.alphabetProcessor, emitDuration: cfg.GetUint32("timers.emit"), }) s.addBlockTimer(emissionTimer) } -func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) { +func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error { parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) if err != nil { - return nil, err + return err } // create alphabet processor - alphabetProcessor, err := alphabet.New(&alphabet.Params{ + s.alphabetProcessor, err = alphabet.New(&alphabet.Params{ ParsedWallets: parsedWallets, Log: s.log, PoolSize: cfg.GetInt("workers.alphabet"), @@ -244,15 +244,15 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, e StorageEmission: cfg.GetUint64("emit.storage.amount"), }) if err != nil { - return nil, err + return err } - err = bindMorphProcessor(alphabetProcessor, s) + err = bindMorphProcessor(s.alphabetProcessor, s) if err != nil { - return nil, err + return err } - return alphabetProcessor, nil + return nil } func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client, @@ -425,13 +425,7 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) { return result, nil } -type serverProcessors struct { - AlphabetProcessor *alphabet.Processor -} - -func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) { - result := &serverProcessors{} - +func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) error { irf := s.createIRFetcher() s.statusIndex = newInnerRingIndexer( @@ -443,35 +437,35 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) if err != nil { - return nil, err + return err } err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync) if err != nil { - return nil, err + return err } err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient) if err != nil { - return nil, err + return err } err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient) if err != nil { - return nil, err + return err } err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient) if err != nil { - return nil, err + return err } - result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg) + err = s.initAlphabetProcessor(cfg) if err != nil { - return nil, err + return err } - return result, nil + return nil } func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) { diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 9119ff201..8c8c13dc3 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" @@ -73,7 +74,8 @@ type ( withoutMainNet bool // runtime processors - netmapProcessor *netmap.Processor + netmapProcessor *netmap.Processor + alphabetProcessor *alphabet.Processor workers []func(context.Context) @@ -383,13 +385,12 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } - var processors *serverProcessors - processors, err = server.initProcessors(cfg, morphClients) + err = server.initProcessors(cfg, morphClients) if err != nil { return nil, err } - server.initTimers(cfg, processors, morphClients) + server.initTimers(cfg, morphClients) err = server.initGRPCServer(cfg) if err != nil { @@ -589,3 +590,12 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler { return newEpochHandlers } + +func (s *Server) SetExtraWallets(cfg *viper.Viper) error { + parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) + if err != nil { + return err + } + s.alphabetProcessor.SetParsedWallets(parsedWallets) + return nil +} diff --git a/pkg/innerring/processors/alphabet/process_emit.go b/pkg/innerring/processors/alphabet/process_emit.go index b8d65dbc5..7a268ac52 100644 --- a/pkg/innerring/processors/alphabet/process_emit.go +++ b/pkg/innerring/processors/alphabet/process_emit.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" + "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -52,7 +53,10 @@ func (ap *Processor) processEmit() { nmNodes := networkMap.Nodes() nmLen := len(nmNodes) - extraLen := len(ap.parsedWallets) + ap.pwLock.RLock() + pw := ap.parsedWallets + ap.pwLock.RUnlock() + extraLen := len(pw) ap.log.Debug(logs.AlphabetGasEmission, zap.Int("network_map", nmLen), @@ -66,7 +70,7 @@ func (ap *Processor) processEmit() { ap.transferGasToNetmapNodes(nmNodes, gasPerNode) - ap.transferGasToExtraNodes(extraLen, gasPerNode) + ap.transferGasToExtraNodes(pw, gasPerNode) } func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) { @@ -92,12 +96,12 @@ func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerN } } -func (ap *Processor) transferGasToExtraNodes(extraLen int, gasPerNode fixedn.Fixed8) { - if extraLen != 0 { - err := ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode) +func (ap *Processor) transferGasToExtraNodes(pw []util.Uint160, gasPerNode fixedn.Fixed8) { + if len(pw) > 0 { + err := ap.morphClient.BatchTransferGas(pw, gasPerNode) if err != nil { - receiversLog := make([]string, extraLen) - for i, addr := range ap.parsedWallets { + receiversLog := make([]string, len(pw)) + for i, addr := range pw { receiversLog[i] = addr.StringLE() } ap.log.Warn(logs.AlphabetCantTransferGasToWallet, diff --git a/pkg/innerring/processors/alphabet/processor.go b/pkg/innerring/processors/alphabet/processor.go index c2d7c1164..cd9088e03 100644 --- a/pkg/innerring/processors/alphabet/processor.go +++ b/pkg/innerring/processors/alphabet/processor.go @@ -3,6 +3,7 @@ package alphabet import ( "errors" "fmt" + "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -45,7 +46,9 @@ type ( // Processor of events produced for alphabet contracts in the sidechain. Processor struct { - parsedWallets []util.Uint160 + parsedWallets []util.Uint160 + // protects parsedWallets from concurrent change + pwLock *sync.RWMutex log *logger.Logger pool *ants.Pool alphabetContracts Contracts @@ -88,6 +91,7 @@ func New(p *Params) (*Processor, error) { return &Processor{ parsedWallets: p.ParsedWallets, + pwLock: new(sync.RWMutex), log: p.Log, pool: pool, alphabetContracts: p.AlphabetContracts, @@ -98,6 +102,12 @@ func New(p *Params) (*Processor, error) { }, nil } +func (ap *Processor) SetParsedWallets(parsedWallets []util.Uint160) { + ap.pwLock.Lock() + ap.parsedWallets = parsedWallets + ap.pwLock.Unlock() +} + // ListenerNotificationParsers for the 'event.Listener' event producer. func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { return nil