ir: Set extra wallets on SIGHUP #339
7 changed files with 61 additions and 36 deletions
|
@ -8,6 +8,7 @@ Changelog for FrostFS Node
|
|||
- Change log level on SIGHUP for ir (#125)
|
||||
- Reload pprof and metrics on SIGHUP for ir (#125)
|
||||
- Support copies number parameter in `frostfs-cli object put` (#351)
|
||||
- Set extra wallets on SIGHUP for ir (#125)
|
||||
|
||||
### Changed
|
||||
- `frostfs-cli util locode generate` is now much faster (#309)
|
||||
|
|
|
@ -62,6 +62,11 @@ func watchForSignal(cancel func()) {
|
|||
}
|
||||
pprofCmp.reload()
|
||||
metricsCmp.reload()
|
||||
log.Info(logs.FrostFSIRReloadExtraWallets)
|
||||
err = innerRing.SetExtraWallets(cfg)
|
||||
if err != nil {
|
||||
log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
|
||||
}
|
||||
log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
case syscall.SIGTERM, syscall.SIGINT:
|
||||
log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
|
||||
|
|
|
@ -414,6 +414,7 @@ const (
|
|||
FrostFSIRApplicationStopped = "application stopped" // Info in ../node/cmd/frostfs-ir/main.go
|
||||
FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint" // Debug in ../node/pkg/morph/client/constructor.go
|
||||
FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint" // Info in ../node/pkg/morph/client/constructor.go
|
||||
FrostFSIRReloadExtraWallets = "reload extra wallets" // Info in ../node/cmd/frostfs-ir/config.go
|
||||
FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file" // Error in ../node/cmd/frostfs-node/grpc.go
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint" // Error in ../node/cmd/frostfs-node/grpc.go
|
||||
FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint" // Info in ../node/cmd/frostfs-node/grpc.go
|
||||
|
|
|
@ -204,7 +204,7 @@ func (s *Server) createIRFetcher() irFetcher {
|
|||
return irf
|
||||
}
|
||||
|
||||
func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) {
|
||||
func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) {
|
||||
s.epochTimer = newEpochTimer(&epochTimerArgs{
|
||||
l: s.log,
|
||||
alphabetState: s,
|
||||
|
@ -219,21 +219,21 @@ func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morp
|
|||
|
||||
// initialize emission timer
|
||||
emissionTimer := newEmissionTimer(&emitTimerArgs{
|
||||
ap: processors.AlphabetProcessor,
|
||||
ap: s.alphabetProcessor,
|
||||
emitDuration: cfg.GetUint32("timers.emit"),
|
||||
})
|
||||
|
||||
s.addBlockTimer(emissionTimer)
|
||||
}
|
||||
|
||||
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) {
|
||||
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
|
||||
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// create alphabet processor
|
||||
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
||||
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
|
||||
ParsedWallets: parsedWallets,
|
||||
Log: s.log,
|
||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||
|
@ -244,15 +244,15 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, e
|
|||
StorageEmission: cfg.GetUint64("emit.storage.amount"),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
err = bindMorphProcessor(alphabetProcessor, s)
|
||||
err = bindMorphProcessor(s.alphabetProcessor, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
return alphabetProcessor, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client,
|
||||
|
@ -425,13 +425,7 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
|
|||
return result, nil
|
||||
}
|
||||
|
||||
type serverProcessors struct {
|
||||
AlphabetProcessor *alphabet.Processor
|
||||
}
|
||||
|
||||
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
|
||||
result := &serverProcessors{}
|
||||
|
||||
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) error {
|
||||
irf := s.createIRFetcher()
|
||||
|
||||
s.statusIndex = newInnerRingIndexer(
|
||||
|
@ -443,35 +437,35 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien
|
|||
|
||||
alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg)
|
||||
err = s.initAlphabetProcessor(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
||||
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
||||
|
@ -73,7 +74,8 @@ type (
|
|||
withoutMainNet bool
|
||||
|
||||
// runtime processors
|
||||
netmapProcessor *netmap.Processor
|
||||
netmapProcessor *netmap.Processor
|
||||
alphabetProcessor *alphabet.Processor
|
||||
|
||||
workers []func(context.Context)
|
||||
|
||||
|
@ -383,13 +385,12 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var processors *serverProcessors
|
||||
processors, err = server.initProcessors(cfg, morphClients)
|
||||
err = server.initProcessors(cfg, morphClients)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server.initTimers(cfg, processors, morphClients)
|
||||
server.initTimers(cfg, morphClients)
|
||||
|
||||
err = server.initGRPCServer(cfg)
|
||||
if err != nil {
|
||||
|
@ -589,3 +590,12 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler {
|
|||
|
||||
return newEpochHandlers
|
||||
}
|
||||
|
||||
func (s *Server) SetExtraWallets(cfg *viper.Viper) error {
|
||||
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.alphabetProcessor.SetParsedWallets(parsedWallets)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -52,7 +53,10 @@ func (ap *Processor) processEmit() {
|
|||
|
||||
nmNodes := networkMap.Nodes()
|
||||
nmLen := len(nmNodes)
|
||||
extraLen := len(ap.parsedWallets)
|
||||
ap.pwLock.RLock()
|
||||
fyrchik
commented
Why is there no mutex taken? It seems Why is there no mutex taken? It seems `processEmit` can be called concurrently.
acid-ant
commented
Previously, mutex used to protect the whole calculation and sending gas to extra wallets. We had a decision to avoid doing something related to network under the mutex. What the reason to protect the change of pointer on array? Previously, mutex used to protect the whole calculation and sending gas to extra wallets. We had a decision to avoid doing something related to network under the mutex. What the reason to protect the change of pointer on array?
acid-ant
commented
Now I know more about slices in GO :) I'll add mutex. Now I know more about slices in GO :) I'll add mutex.
acid-ant
commented
Mutex added, please review. Mutex added, please review.
|
||||
pw := ap.parsedWallets
|
||||
ap.pwLock.RUnlock()
|
||||
extraLen := len(pw)
|
||||
|
||||
ap.log.Debug(logs.AlphabetGasEmission,
|
||||
zap.Int("network_map", nmLen),
|
||||
|
@ -66,7 +70,7 @@ func (ap *Processor) processEmit() {
|
|||
|
||||
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
|
||||
|
||||
ap.transferGasToExtraNodes(extraLen, gasPerNode)
|
||||
ap.transferGasToExtraNodes(pw, gasPerNode)
|
||||
}
|
||||
|
||||
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
|
||||
|
@ -92,12 +96,12 @@ func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerN
|
|||
}
|
||||
}
|
||||
|
||||
aarifullin
commented
I know that the length cannot be I know that the length cannot be `< 0` but let it be `len(pw) > 0` :)
|
||||
func (ap *Processor) transferGasToExtraNodes(extraLen int, gasPerNode fixedn.Fixed8) {
|
||||
if extraLen != 0 {
|
||||
err := ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode)
|
||||
func (ap *Processor) transferGasToExtraNodes(pw []util.Uint160, gasPerNode fixedn.Fixed8) {
|
||||
if len(pw) > 0 {
|
||||
err := ap.morphClient.BatchTransferGas(pw, gasPerNode)
|
||||
if err != nil {
|
||||
receiversLog := make([]string, extraLen)
|
||||
for i, addr := range ap.parsedWallets {
|
||||
receiversLog := make([]string, len(pw))
|
||||
for i, addr := range pw {
|
||||
receiversLog[i] = addr.StringLE()
|
||||
}
|
||||
ap.log.Warn(logs.AlphabetCantTransferGasToWallet,
|
||||
|
|
|
@ -3,6 +3,7 @@ package alphabet
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -45,7 +46,9 @@ type (
|
|||
|
||||
// Processor of events produced for alphabet contracts in the sidechain.
|
||||
Processor struct {
|
||||
parsedWallets []util.Uint160
|
||||
parsedWallets []util.Uint160
|
||||
// protects parsedWallets from concurrent change
|
||||
pwLock *sync.RWMutex
|
||||
log *logger.Logger
|
||||
pool *ants.Pool
|
||||
alphabetContracts Contracts
|
||||
|
@ -88,6 +91,7 @@ func New(p *Params) (*Processor, error) {
|
|||
|
||||
return &Processor{
|
||||
parsedWallets: p.ParsedWallets,
|
||||
pwLock: new(sync.RWMutex),
|
||||
log: p.Log,
|
||||
pool: pool,
|
||||
alphabetContracts: p.AlphabetContracts,
|
||||
|
@ -98,6 +102,12 @@ func New(p *Params) (*Processor, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (ap *Processor) SetParsedWallets(parsedWallets []util.Uint160) {
|
||||
ap.pwLock.Lock()
|
||||
ap.parsedWallets = parsedWallets
|
||||
ap.pwLock.Unlock()
|
||||
}
|
||||
|
||||
// ListenerNotificationParsers for the 'event.Listener' event producer.
|
||||
func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
|
||||
return nil
|
||||
|
|
Loading…
Add table
Reference in a new issue
Do we need this message if wallets haven't changed?
That was done in the same manner as it done for pprof and metrics. Should we revise this?