ir: Set extra wallets on SIGHUP #339

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:feature/125-ir-sighup-log into master 2023-05-16 12:47:45 +00:00
7 changed files with 61 additions and 36 deletions

View file

@ -8,6 +8,7 @@ Changelog for FrostFS Node
- Change log level on SIGHUP for ir (#125) - Change log level on SIGHUP for ir (#125)
- Reload pprof and metrics on SIGHUP for ir (#125) - Reload pprof and metrics on SIGHUP for ir (#125)
- Support copies number parameter in `frostfs-cli object put` (#351) - Support copies number parameter in `frostfs-cli object put` (#351)
- Set extra wallets on SIGHUP for ir (#125)
### Changed ### Changed
- `frostfs-cli util locode generate` is now much faster (#309) - `frostfs-cli util locode generate` is now much faster (#309)

View file

@ -62,6 +62,11 @@ func watchForSignal(cancel func()) {
} }
pprofCmp.reload() pprofCmp.reload()
metricsCmp.reload() metricsCmp.reload()
log.Info(logs.FrostFSIRReloadExtraWallets)
err = innerRing.SetExtraWallets(cfg)
if err != nil {
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
}
log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
case syscall.SIGTERM, syscall.SIGINT: case syscall.SIGTERM, syscall.SIGINT:
log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping) log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)

View file

@ -414,6 +414,7 @@ const (
FrostFSIRApplicationStopped = "application stopped" // Info in ../node/cmd/frostfs-ir/main.go FrostFSIRApplicationStopped = "application stopped" // Info in ../node/cmd/frostfs-ir/main.go
FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint" // Debug in ../node/pkg/morph/client/constructor.go FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint" // Debug in ../node/pkg/morph/client/constructor.go
FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint" // Info in ../node/pkg/morph/client/constructor.go FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint" // Info in ../node/pkg/morph/client/constructor.go
FrostFSIRReloadExtraWallets = "reload extra wallets" // Info in ../node/cmd/frostfs-ir/config.go
FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file" // Error in ../node/cmd/frostfs-node/grpc.go FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file" // Error in ../node/cmd/frostfs-node/grpc.go
fyrchik marked this conversation as resolved Outdated

Do we need this message if wallets haven't changed?

Do we need this message if wallets haven't changed?

That was done in the same manner as it done for pprof and metrics. Should we revise this?

That was done in the same manner as it done for pprof and metrics. Should we revise this?
FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint" // Error in ../node/cmd/frostfs-node/grpc.go FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint" // Error in ../node/cmd/frostfs-node/grpc.go
FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint" // Info in ../node/cmd/frostfs-node/grpc.go FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint" // Info in ../node/cmd/frostfs-node/grpc.go

View file

@ -204,7 +204,7 @@ func (s *Server) createIRFetcher() irFetcher {
return irf return irf
} }
func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) { func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) {
s.epochTimer = newEpochTimer(&epochTimerArgs{ s.epochTimer = newEpochTimer(&epochTimerArgs{
l: s.log, l: s.log,
alphabetState: s, alphabetState: s,
@ -219,21 +219,21 @@ func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morp
// initialize emission timer // initialize emission timer
emissionTimer := newEmissionTimer(&emitTimerArgs{ emissionTimer := newEmissionTimer(&emitTimerArgs{
ap: processors.AlphabetProcessor, ap: s.alphabetProcessor,
emitDuration: cfg.GetUint32("timers.emit"), emitDuration: cfg.GetUint32("timers.emit"),
}) })
s.addBlockTimer(emissionTimer) s.addBlockTimer(emissionTimer)
} }
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) { func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil { if err != nil {
return nil, err return err
} }
// create alphabet processor // create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{ s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets, ParsedWallets: parsedWallets,
Log: s.log, Log: s.log,
PoolSize: cfg.GetInt("workers.alphabet"), PoolSize: cfg.GetInt("workers.alphabet"),
@ -244,15 +244,15 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, e
StorageEmission: cfg.GetUint64("emit.storage.amount"), StorageEmission: cfg.GetUint64("emit.storage.amount"),
}) })
if err != nil { if err != nil {
return nil, err return err
} }
err = bindMorphProcessor(alphabetProcessor, s) err = bindMorphProcessor(s.alphabetProcessor, s)
if err != nil { if err != nil {
return nil, err return err
} }
return alphabetProcessor, nil return nil
} }
func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client, func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client,
@ -425,13 +425,7 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
return result, nil return result, nil
} }
type serverProcessors struct { func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) error {
AlphabetProcessor *alphabet.Processor
}
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
result := &serverProcessors{}
irf := s.createIRFetcher() irf := s.createIRFetcher()
s.statusIndex = newInnerRingIndexer( s.statusIndex = newInnerRingIndexer(
@ -443,35 +437,35 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien
alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync) err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient) err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient) err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient)
if err != nil { if err != nil {
return nil, err return err
} }
err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient) err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient)
if err != nil { if err != nil {
return nil, err return err
} }
result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg) err = s.initAlphabetProcessor(cfg)
if err != nil { if err != nil {
return nil, err return err
} }
return result, nil return nil
} }
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) { func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
@ -74,6 +75,7 @@ type (
// runtime processors // runtime processors
netmapProcessor *netmap.Processor netmapProcessor *netmap.Processor
alphabetProcessor *alphabet.Processor
workers []func(context.Context) workers []func(context.Context)
@ -383,13 +385,12 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err return nil, err
} }
var processors *serverProcessors err = server.initProcessors(cfg, morphClients)
processors, err = server.initProcessors(cfg, morphClients)
if err != nil { if err != nil {
return nil, err return nil, err
} }
server.initTimers(cfg, processors, morphClients) server.initTimers(cfg, morphClients)
err = server.initGRPCServer(cfg) err = server.initGRPCServer(cfg)
if err != nil { if err != nil {
@ -589,3 +590,12 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler {
return newEpochHandlers return newEpochHandlers
} }
func (s *Server) SetExtraWallets(cfg *viper.Viper) error {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return err
}
s.alphabetProcessor.SetParsedWallets(parsedWallets)
return nil
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -52,7 +53,10 @@ func (ap *Processor) processEmit() {
nmNodes := networkMap.Nodes() nmNodes := networkMap.Nodes()
nmLen := len(nmNodes) nmLen := len(nmNodes)
extraLen := len(ap.parsedWallets) ap.pwLock.RLock()

Why is there no mutex taken? It seems processEmit can be called concurrently.

Why is there no mutex taken? It seems `processEmit` can be called concurrently.

Previously, mutex used to protect the whole calculation and sending gas to extra wallets. We had a decision to avoid doing something related to network under the mutex. What the reason to protect the change of pointer on array?

Previously, mutex used to protect the whole calculation and sending gas to extra wallets. We had a decision to avoid doing something related to network under the mutex. What the reason to protect the change of pointer on array?

Now I know more about slices in GO :) I'll add mutex.

Now I know more about slices in GO :) I'll add mutex.

Mutex added, please review.

Mutex added, please review.
pw := ap.parsedWallets
ap.pwLock.RUnlock()
extraLen := len(pw)
ap.log.Debug(logs.AlphabetGasEmission, ap.log.Debug(logs.AlphabetGasEmission,
zap.Int("network_map", nmLen), zap.Int("network_map", nmLen),
@ -66,7 +70,7 @@ func (ap *Processor) processEmit() {
ap.transferGasToNetmapNodes(nmNodes, gasPerNode) ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
ap.transferGasToExtraNodes(extraLen, gasPerNode) ap.transferGasToExtraNodes(pw, gasPerNode)
} }
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) { func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
@ -92,12 +96,12 @@ func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerN
} }
} }

I know that the length cannot be < 0 but let it be len(pw) > 0 :)

I know that the length cannot be `< 0` but let it be `len(pw) > 0` :)
func (ap *Processor) transferGasToExtraNodes(extraLen int, gasPerNode fixedn.Fixed8) { func (ap *Processor) transferGasToExtraNodes(pw []util.Uint160, gasPerNode fixedn.Fixed8) {
if extraLen != 0 { if len(pw) > 0 {
err := ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode) err := ap.morphClient.BatchTransferGas(pw, gasPerNode)
if err != nil { if err != nil {
receiversLog := make([]string, extraLen) receiversLog := make([]string, len(pw))
for i, addr := range ap.parsedWallets { for i, addr := range pw {
receiversLog[i] = addr.StringLE() receiversLog[i] = addr.StringLE()
} }
ap.log.Warn(logs.AlphabetCantTransferGasToWallet, ap.log.Warn(logs.AlphabetCantTransferGasToWallet,

View file

@ -3,6 +3,7 @@ package alphabet
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -46,6 +47,8 @@ type (
// Processor of events produced for alphabet contracts in the sidechain. // Processor of events produced for alphabet contracts in the sidechain.
Processor struct { Processor struct {
parsedWallets []util.Uint160 parsedWallets []util.Uint160
// protects parsedWallets from concurrent change
pwLock *sync.RWMutex
log *logger.Logger log *logger.Logger
pool *ants.Pool pool *ants.Pool
alphabetContracts Contracts alphabetContracts Contracts
@ -88,6 +91,7 @@ func New(p *Params) (*Processor, error) {
return &Processor{ return &Processor{
parsedWallets: p.ParsedWallets, parsedWallets: p.ParsedWallets,
pwLock: new(sync.RWMutex),
log: p.Log, log: p.Log,
pool: pool, pool: pool,
alphabetContracts: p.AlphabetContracts, alphabetContracts: p.AlphabetContracts,
@ -98,6 +102,12 @@ func New(p *Params) (*Processor, error) {
}, nil }, nil
} }
func (ap *Processor) SetParsedWallets(parsedWallets []util.Uint160) {
ap.pwLock.Lock()
ap.parsedWallets = parsedWallets
ap.pwLock.Unlock()
}
// ListenerNotificationParsers for the 'event.Listener' event producer. // ListenerNotificationParsers for the 'event.Listener' event producer.
func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
return nil return nil