ir: Set extra wallets on SIGHUP #339

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:feature/125-ir-sighup-log into master 2023-05-16 12:47:45 +00:00
7 changed files with 61 additions and 36 deletions

View file

@ -8,6 +8,7 @@ Changelog for FrostFS Node
- Change log level on SIGHUP for ir (#125)
- Reload pprof and metrics on SIGHUP for ir (#125)
- Support copies number parameter in `frostfs-cli object put` (#351)
- Set extra wallets on SIGHUP for ir (#125)
### Changed
- `frostfs-cli util locode generate` is now much faster (#309)

View file

@ -62,6 +62,11 @@ func watchForSignal(cancel func()) {
}
pprofCmp.reload()
metricsCmp.reload()
log.Info(logs.FrostFSIRReloadExtraWallets)
err = innerRing.SetExtraWallets(cfg)
if err != nil {
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
}
log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
case syscall.SIGTERM, syscall.SIGINT:
log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)

View file

@ -414,6 +414,7 @@ const (
FrostFSIRApplicationStopped = "application stopped" // Info in ../node/cmd/frostfs-ir/main.go
FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint" // Debug in ../node/pkg/morph/client/constructor.go
FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint" // Info in ../node/pkg/morph/client/constructor.go
FrostFSIRReloadExtraWallets = "reload extra wallets" // Info in ../node/cmd/frostfs-ir/config.go
FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file" // Error in ../node/cmd/frostfs-node/grpc.go
fyrchik marked this conversation as resolved Outdated

Do we need this message if wallets haven't changed?

Do we need this message if wallets haven't changed?

That was done in the same manner as it done for pprof and metrics. Should we revise this?

That was done in the same manner as it done for pprof and metrics. Should we revise this?
FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint" // Error in ../node/cmd/frostfs-node/grpc.go
FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint" // Info in ../node/cmd/frostfs-node/grpc.go

View file

@ -204,7 +204,7 @@ func (s *Server) createIRFetcher() irFetcher {
return irf
}
func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) {
func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) {
s.epochTimer = newEpochTimer(&epochTimerArgs{
l: s.log,
alphabetState: s,
@ -219,21 +219,21 @@ func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morp
// initialize emission timer
emissionTimer := newEmissionTimer(&emitTimerArgs{
ap: processors.AlphabetProcessor,
ap: s.alphabetProcessor,
emitDuration: cfg.GetUint32("timers.emit"),
})
s.addBlockTimer(emissionTimer)
}
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) {
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return nil, err
return err
}
// create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets,
Log: s.log,
PoolSize: cfg.GetInt("workers.alphabet"),
@ -244,15 +244,15 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, e
StorageEmission: cfg.GetUint64("emit.storage.amount"),
})
if err != nil {
return nil, err
return err
}
err = bindMorphProcessor(alphabetProcessor, s)
err = bindMorphProcessor(s.alphabetProcessor, s)
if err != nil {
return nil, err
return err
}
return alphabetProcessor, nil
return nil
}
func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client,
@ -425,13 +425,7 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
return result, nil
}
type serverProcessors struct {
AlphabetProcessor *alphabet.Processor
}
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
result := &serverProcessors{}
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) error {
irf := s.createIRFetcher()
s.statusIndex = newInnerRingIndexer(
@ -443,35 +437,35 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien
alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
if err != nil {
return nil, err
return err
}
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync)
if err != nil {
return nil, err
return err
}
err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient)
if err != nil {
return nil, err
return err
}
err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient)
if err != nil {
return nil, err
return err
}
err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient)
if err != nil {
return nil, err
return err
}
result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg)
err = s.initAlphabetProcessor(cfg)
if err != nil {
return nil, err
return err
}
return result, nil
return nil
}
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
@ -73,7 +74,8 @@ type (
withoutMainNet bool
// runtime processors
netmapProcessor *netmap.Processor
netmapProcessor *netmap.Processor
alphabetProcessor *alphabet.Processor
workers []func(context.Context)
@ -383,13 +385,12 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err
}
var processors *serverProcessors
processors, err = server.initProcessors(cfg, morphClients)
err = server.initProcessors(cfg, morphClients)
if err != nil {
return nil, err
}
server.initTimers(cfg, processors, morphClients)
server.initTimers(cfg, morphClients)
err = server.initGRPCServer(cfg)
if err != nil {
@ -589,3 +590,12 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler {
return newEpochHandlers
}
func (s *Server) SetExtraWallets(cfg *viper.Viper) error {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return err
}
s.alphabetProcessor.SetParsedWallets(parsedWallets)
return nil
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
@ -52,7 +53,10 @@ func (ap *Processor) processEmit() {
nmNodes := networkMap.Nodes()
nmLen := len(nmNodes)
extraLen := len(ap.parsedWallets)
ap.pwLock.RLock()

Why is there no mutex taken? It seems processEmit can be called concurrently.

Why is there no mutex taken? It seems `processEmit` can be called concurrently.

Previously, mutex used to protect the whole calculation and sending gas to extra wallets. We had a decision to avoid doing something related to network under the mutex. What the reason to protect the change of pointer on array?

Previously, mutex used to protect the whole calculation and sending gas to extra wallets. We had a decision to avoid doing something related to network under the mutex. What the reason to protect the change of pointer on array?

Now I know more about slices in GO :) I'll add mutex.

Now I know more about slices in GO :) I'll add mutex.

Mutex added, please review.

Mutex added, please review.
pw := ap.parsedWallets
ap.pwLock.RUnlock()
extraLen := len(pw)
ap.log.Debug(logs.AlphabetGasEmission,
zap.Int("network_map", nmLen),
@ -66,7 +70,7 @@ func (ap *Processor) processEmit() {
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
ap.transferGasToExtraNodes(extraLen, gasPerNode)
ap.transferGasToExtraNodes(pw, gasPerNode)
}
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
@ -92,12 +96,12 @@ func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerN
}
}

I know that the length cannot be < 0 but let it be len(pw) > 0 :)

I know that the length cannot be `< 0` but let it be `len(pw) > 0` :)
func (ap *Processor) transferGasToExtraNodes(extraLen int, gasPerNode fixedn.Fixed8) {
if extraLen != 0 {
err := ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode)
func (ap *Processor) transferGasToExtraNodes(pw []util.Uint160, gasPerNode fixedn.Fixed8) {
if len(pw) > 0 {
err := ap.morphClient.BatchTransferGas(pw, gasPerNode)
if err != nil {
receiversLog := make([]string, extraLen)
for i, addr := range ap.parsedWallets {
receiversLog := make([]string, len(pw))
for i, addr := range pw {
receiversLog[i] = addr.StringLE()
}
ap.log.Warn(logs.AlphabetCantTransferGasToWallet,

View file

@ -3,6 +3,7 @@ package alphabet
import (
"errors"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -45,7 +46,9 @@ type (
// Processor of events produced for alphabet contracts in the sidechain.
Processor struct {
parsedWallets []util.Uint160
parsedWallets []util.Uint160
// protects parsedWallets from concurrent change
pwLock *sync.RWMutex
log *logger.Logger
pool *ants.Pool
alphabetContracts Contracts
@ -88,6 +91,7 @@ func New(p *Params) (*Processor, error) {
return &Processor{
parsedWallets: p.ParsedWallets,
pwLock: new(sync.RWMutex),
log: p.Log,
pool: pool,
alphabetContracts: p.AlphabetContracts,
@ -98,6 +102,12 @@ func New(p *Params) (*Processor, error) {
}, nil
}
func (ap *Processor) SetParsedWallets(parsedWallets []util.Uint160) {
ap.pwLock.Lock()
ap.parsedWallets = parsedWallets
ap.pwLock.Unlock()
}
// ListenerNotificationParsers for the 'event.Listener' event producer.
func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
return nil