516 lines
15 KiB
Go
516 lines
15 KiB
Go
package innerring
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"net"
|
|
"sync/atomic"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance"
|
|
cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
|
nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation"
|
|
addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
|
|
statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
|
controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
|
|
utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
|
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
|
"github.com/spf13/viper"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
|
alphaSync event.Handler,
|
|
) error {
|
|
locodeValidator, err := s.newLocodeValidator(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
netSettings := (*networkSettings)(s.netmapClient)
|
|
|
|
var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator
|
|
netMapCandidateStateValidator.SetNetworkSettings(netSettings)
|
|
|
|
poolSize := cfg.GetInt("workers.netmap")
|
|
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
|
|
|
|
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
|
Log: s.log,
|
|
Metrics: s.irMetrics,
|
|
PoolSize: poolSize,
|
|
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
|
|
EpochTimer: s,
|
|
EpochState: s,
|
|
AlphabetState: s,
|
|
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
|
|
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
|
|
NotaryDepositHandler: s.onlyAlphabetEventHandler(
|
|
s.notaryHandler,
|
|
),
|
|
AlphabetSyncHandler: s.onlyAlphabetEventHandler(
|
|
alphaSync,
|
|
),
|
|
NodeValidator: nodevalidator.New(
|
|
&netMapCandidateStateValidator,
|
|
addrvalidator.New(),
|
|
locodeValidator,
|
|
),
|
|
|
|
NodeStateSettings: netSettings,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bindMorphProcessor(s.netmapProcessor, s)
|
|
}
|
|
|
|
func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *chainParams, errChan chan<- error) error {
|
|
s.withoutMainNet = cfg.GetBool("without_mainnet")
|
|
if s.withoutMainNet {
|
|
// This works as long as event Listener starts listening loop once,
|
|
// otherwise Server.Start will run two similar routines.
|
|
// This behavior most likely will not change.
|
|
s.mainnetListener = s.morphListener
|
|
s.mainnetClient = s.morphClient
|
|
return nil
|
|
}
|
|
|
|
mainnetChain := morphChain
|
|
mainnetChain.name = mainnetPrefix
|
|
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
|
|
|
|
fromMainChainBlock, err := s.persistate.UInt32(persistateMainChainLastBlockKey)
|
|
if err != nil {
|
|
fromMainChainBlock = 0
|
|
s.log.Warn(ctx, logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error()))
|
|
}
|
|
mainnetChain.from = fromMainChainBlock
|
|
|
|
// create mainnet client
|
|
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// create mainnet listener
|
|
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain)
|
|
return err
|
|
}
|
|
|
|
func (s *Server) enableNotarySupport() error {
|
|
// enable notary support in the side client
|
|
err := s.morphClient.EnableNotarySupport(
|
|
client.WithProxyContract(s.contracts.proxy),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("could not enable side chain notary support: %w", err)
|
|
}
|
|
|
|
s.morphListener.EnableNotarySupport(s.contracts.proxy, s.morphClient.Committee, s.morphClient)
|
|
|
|
if !s.mainNotaryConfig.disabled {
|
|
// enable notary support in the main client
|
|
err := s.mainnetClient.EnableNotarySupport(
|
|
client.WithProxyContract(s.contracts.processing),
|
|
client.WithAlphabetSource(s.morphClient.Committee),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("could not enable main chain notary support: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) initNotaryConfig(ctx context.Context) {
|
|
s.mainNotaryConfig = notaryConfigs(
|
|
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
|
)
|
|
|
|
s.log.Info(ctx, logs.InnerringNotarySupport,
|
|
zap.Bool("sidechain_enabled", true),
|
|
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
|
|
)
|
|
}
|
|
|
|
func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) {
|
|
var alphaSync event.Handler
|
|
|
|
if s.withoutMainNet || cfg.GetBool("governance.disable") {
|
|
alphaSync = func(ctx context.Context, _ event.Event) {
|
|
s.log.Debug(ctx, logs.InnerringAlphabetKeysSyncIsDisabled)
|
|
}
|
|
} else {
|
|
// create governance processor
|
|
governanceProcessor, err := governance.New(&governance.Params{
|
|
Log: s.log,
|
|
Metrics: s.irMetrics,
|
|
FrostFSClient: frostfsCli,
|
|
AlphabetState: s,
|
|
EpochState: s,
|
|
Voter: s,
|
|
IRFetcher: irf,
|
|
MorphClient: s.morphClient,
|
|
MainnetClient: s.mainnetClient,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
alphaSync = governanceProcessor.HandleAlphabetSync
|
|
err = bindMainnetProcessor(governanceProcessor, s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return alphaSync, nil
|
|
}
|
|
|
|
func (s *Server) createIRFetcher() irFetcher {
|
|
var irf irFetcher
|
|
|
|
if s.withoutMainNet || !s.mainNotaryConfig.disabled {
|
|
// if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs
|
|
// (naming `...WithNotary` will not always be correct)
|
|
irf = NewIRFetcherWithNotary(s.morphClient)
|
|
} else {
|
|
irf = NewIRFetcherWithoutNotary(s.netmapClient)
|
|
}
|
|
|
|
return irf
|
|
}
|
|
|
|
func (s *Server) initTimers(ctx context.Context, cfg *viper.Viper) {
|
|
s.epochTimer = newEpochTimer(&epochTimerArgs{
|
|
newEpochHandlers: s.newEpochTickHandlers(ctx),
|
|
epoch: s,
|
|
})
|
|
|
|
s.addBlockTimer(s.epochTimer)
|
|
|
|
// initialize emission timer
|
|
emissionTimer := newEmissionTimer(ctx, &emitTimerArgs{
|
|
ap: s.alphabetProcessor,
|
|
emitDuration: cfg.GetUint32("timers.emit"),
|
|
})
|
|
|
|
s.addBlockTimer(emissionTimer)
|
|
}
|
|
|
|
func (s *Server) initAlphabetProcessor(ctx context.Context, cfg *viper.Viper) error {
|
|
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
poolSize := cfg.GetInt("workers.alphabet")
|
|
s.log.Debug(ctx, logs.AlphabetAlphabetWorkerPool, zap.Int("size", poolSize))
|
|
|
|
// create alphabet processor
|
|
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
|
|
ParsedWallets: parsedWallets,
|
|
Log: s.log,
|
|
Metrics: s.irMetrics,
|
|
PoolSize: poolSize,
|
|
AlphabetContracts: s.contracts.alphabet,
|
|
NetmapClient: s.netmapClient,
|
|
MorphClient: s.morphClient,
|
|
IRList: s,
|
|
StorageEmission: cfg.GetUint64("emit.storage.amount"),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = bindMorphProcessor(s.alphabetProcessor, s)
|
|
return err
|
|
}
|
|
|
|
func (s *Server) initContainerProcessor(ctx context.Context, cfg *viper.Viper, cnrClient *container.Client, frostfsIDClient *frostfsid.Client) error {
|
|
poolSize := cfg.GetInt("workers.container")
|
|
s.log.Debug(ctx, logs.ContainerContainerWorkerPool, zap.Int("size", poolSize))
|
|
// container processor
|
|
containerProcessor, err := cont.New(&cont.Params{
|
|
Log: s.log,
|
|
Metrics: s.irMetrics,
|
|
PoolSize: poolSize,
|
|
AlphabetState: s,
|
|
ContainerClient: cnrClient,
|
|
MorphClient: cnrClient.Morph(),
|
|
FrostFSIDClient: frostfsIDClient,
|
|
NetworkState: s.netmapClient,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bindMorphProcessor(containerProcessor, s)
|
|
}
|
|
|
|
func (s *Server) initBalanceProcessor(ctx context.Context, cfg *viper.Viper, frostfsCli *frostfsClient.Client) error {
|
|
poolSize := cfg.GetInt("workers.balance")
|
|
s.log.Debug(ctx, logs.BalanceBalanceWorkerPool, zap.Int("size", poolSize))
|
|
// create balance processor
|
|
balanceProcessor, err := balance.New(&balance.Params{
|
|
Log: s.log,
|
|
Metrics: s.irMetrics,
|
|
PoolSize: poolSize,
|
|
FrostFSClient: frostfsCli,
|
|
BalanceSC: s.contracts.balance,
|
|
AlphabetState: s,
|
|
Converter: &s.precision,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bindMorphProcessor(balanceProcessor, s)
|
|
}
|
|
|
|
func (s *Server) initFrostFSMainnetProcessor(ctx context.Context, cfg *viper.Viper) error {
|
|
if s.withoutMainNet {
|
|
return nil
|
|
}
|
|
poolSize := cfg.GetInt("workers.frostfs")
|
|
s.log.Debug(ctx, logs.FrostFSFrostfsWorkerPool, zap.Int("size", poolSize))
|
|
|
|
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
|
Log: s.log,
|
|
Metrics: s.irMetrics,
|
|
PoolSize: poolSize,
|
|
FrostFSContract: s.contracts.frostfs,
|
|
BalanceClient: s.balanceClient,
|
|
NetmapClient: s.netmapClient,
|
|
MorphClient: s.morphClient,
|
|
EpochState: s,
|
|
AlphabetState: s,
|
|
Converter: &s.precision,
|
|
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
|
|
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
|
|
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
|
|
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bindMainnetProcessor(frostfsProcessor, s)
|
|
}
|
|
|
|
func (s *Server) initGRPCServer(ctx context.Context, cfg *viper.Viper, log *logger.Logger, audit *atomic.Bool) error {
|
|
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
|
|
if controlSvcEndpoint == "" {
|
|
s.log.Info(ctx, logs.InnerringNoControlServerEndpointSpecified)
|
|
return nil
|
|
}
|
|
|
|
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
|
|
authKeys := make([][]byte, 0, len(authKeysStr))
|
|
|
|
for i := range authKeysStr {
|
|
key, err := hex.DecodeString(authKeysStr[i])
|
|
if err != nil {
|
|
return fmt.Errorf("could not parse Control authorized key %s: %w",
|
|
authKeysStr[i],
|
|
err,
|
|
)
|
|
}
|
|
|
|
authKeys = append(authKeys, key)
|
|
}
|
|
|
|
var p controlsrv.Prm
|
|
|
|
p.SetPrivateKey(*s.key)
|
|
p.SetHealthChecker(s)
|
|
|
|
controlSvc := controlsrv.NewAuditService(controlsrv.New(p, s.netmapClient, s.containerClient,
|
|
controlsrv.WithAllowedKeys(authKeys),
|
|
), log, audit)
|
|
|
|
grpcControlSrv := grpc.NewServer()
|
|
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
|
|
|
|
s.runners = append(s.runners, func(ch chan<- error) error {
|
|
lis, err := net.Listen("tcp", controlSvcEndpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
ch <- grpcControlSrv.Serve(lis)
|
|
}()
|
|
return nil
|
|
})
|
|
|
|
s.registerNoErrCloser(grpcControlSrv.GracefulStop)
|
|
return nil
|
|
}
|
|
|
|
type serverMorphClients struct {
|
|
CnrClient *container.Client
|
|
FrostFSIDClient *frostfsid.Client
|
|
FrostFSClient *frostfsClient.Client
|
|
}
|
|
|
|
func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
|
|
result := &serverMorphClients{}
|
|
var err error
|
|
|
|
fee := s.feeConfig.SideChainFee()
|
|
|
|
// form morph container client's options
|
|
morphCnrOpts := make([]container.Option, 0, 3)
|
|
morphCnrOpts = append(morphCnrOpts,
|
|
container.TryNotary(),
|
|
container.AsAlphabet(),
|
|
)
|
|
|
|
result.CnrClient, err = container.NewFromMorph(s.morphClient, s.contracts.container, fee, morphCnrOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.containerClient = result.CnrClient
|
|
|
|
s.netmapClient, err = nmClient.NewFromMorph(s.morphClient, s.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.balanceClient, err = balanceClient.NewFromMorph(s.morphClient, s.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result.FrostFSIDClient, err = frostfsid.NewFromMorph(s.morphClient, s.contracts.frostfsID, fee)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result.FrostFSClient, err = frostfsClient.NewFromMorph(s.mainnetClient, s.contracts.frostfs,
|
|
s.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (s *Server) initProcessors(ctx context.Context, cfg *viper.Viper, morphClients *serverMorphClients) error {
|
|
irf := s.createIRFetcher()
|
|
|
|
s.statusIndex = newInnerRingIndexer(
|
|
s.morphClient,
|
|
irf,
|
|
s.key.PublicKey(),
|
|
cfg.GetDuration("indexer.cache_timeout"),
|
|
)
|
|
|
|
alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.initNetmapProcessor(ctx, cfg, alphaSync)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.initContainerProcessor(ctx, cfg, morphClients.CnrClient, morphClients.FrostFSIDClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.initBalanceProcessor(ctx, cfg, morphClients.FrostFSClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.initFrostFSMainnetProcessor(ctx, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.initAlphabetProcessor(ctx, cfg)
|
|
return err
|
|
}
|
|
|
|
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {
|
|
fromSideChainBlock, err := s.persistate.UInt32(persistateSideChainLastBlockKey)
|
|
if err != nil {
|
|
fromSideChainBlock = 0
|
|
s.log.Warn(ctx, logs.InnerringCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error()))
|
|
}
|
|
|
|
morphChain := &chainParams{
|
|
log: s.log,
|
|
cfg: cfg,
|
|
key: s.key,
|
|
name: morphPrefix,
|
|
from: fromSideChainBlock,
|
|
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
|
multinetMetrics: s.irMetrics.Multinet(),
|
|
}
|
|
|
|
// create morph client
|
|
s.morphClient, err = createClient(ctx, morphChain, errChan)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// create morph listener
|
|
s.morphListener, err = createListener(ctx, s.morphClient, morphChain)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := s.morphClient.SetGroupSignerScope(); err != nil {
|
|
morphChain.log.Info(ctx, logs.InnerringFailedToSetGroupSignerScope, zap.Error(err))
|
|
}
|
|
|
|
return morphChain, nil
|
|
}
|
|
|
|
func (s *Server) initContracts(cfg *viper.Viper) error {
|
|
var err error
|
|
// get all script hashes of contracts
|
|
s.contracts, err = parseContracts(
|
|
cfg,
|
|
s.morphClient,
|
|
s.withoutMainNet,
|
|
s.mainNotaryConfig.disabled,
|
|
)
|
|
|
|
return err
|
|
}
|
|
|
|
func (s *Server) initKey(cfg *viper.Viper) error {
|
|
// prepare inner ring node private key
|
|
acc, err := utilConfig.LoadAccount(
|
|
cfg.GetString("wallet.path"),
|
|
cfg.GetString("wallet.address"),
|
|
cfg.GetString("wallet.password"))
|
|
if err != nil {
|
|
return fmt.Errorf("ir: %w", err)
|
|
}
|
|
|
|
s.key = acc.PrivateKey()
|
|
return nil
|
|
}
|