Compare commits
8 commits
master
...
refactorin
Author | SHA1 | Date | |
---|---|---|---|
d161f83613 | |||
b38858c8a6 | |||
dc42b0c18b | |||
c51388fabc | |||
4c0c962b29 | |||
3c1796a3d1 | |||
fe822dda3f | |||
a24931205f |
10 changed files with 1072 additions and 887 deletions
|
@ -52,96 +52,56 @@ const (
|
|||
lastLetterNum
|
||||
)
|
||||
|
||||
var glagolicLetterToString = map[GlagoliticLetter]string{
|
||||
az: "az",
|
||||
buky: "buky",
|
||||
vedi: "vedi",
|
||||
glagoli: "glagoli",
|
||||
dobro: "dobro",
|
||||
yest: "yest",
|
||||
zhivete: "zhivete",
|
||||
dzelo: "dzelo",
|
||||
zemlja: "zemlja",
|
||||
izhe: "izhe",
|
||||
izhei: "izhei",
|
||||
gerv: "gerv",
|
||||
kako: "kako",
|
||||
ljudi: "ljudi",
|
||||
mislete: "mislete",
|
||||
nash: "nash",
|
||||
on: "on",
|
||||
pokoj: "pokoj",
|
||||
rtsi: "rtsi",
|
||||
slovo: "slovo",
|
||||
tverdo: "tverdo",
|
||||
uk: "uk",
|
||||
fert: "fert",
|
||||
kher: "kher",
|
||||
oht: "oht",
|
||||
shta: "shta",
|
||||
tsi: "tsi",
|
||||
cherv: "cherv",
|
||||
sha: "sha",
|
||||
yer: "yer",
|
||||
yeri: "yeri",
|
||||
yerj: "yerj",
|
||||
yat: "yat",
|
||||
jo: "jo",
|
||||
yu: "yu",
|
||||
smallYus: "small.yus",
|
||||
smallIotatedYus: "small.iotated.yus",
|
||||
bigYus: "big.yus",
|
||||
bigIotatedYus: "big.iotated.yus",
|
||||
fita: "fita",
|
||||
izhitsa: "izhitsa",
|
||||
}
|
||||
|
||||
// String returns l in config-compatible format.
|
||||
//
|
||||
// nolint: funlen
|
||||
func (l GlagoliticLetter) String() string {
|
||||
switch l {
|
||||
default:
|
||||
return "unknown"
|
||||
case az:
|
||||
return "az"
|
||||
case buky:
|
||||
return "buky"
|
||||
case vedi:
|
||||
return "vedi"
|
||||
case glagoli:
|
||||
return "glagoli"
|
||||
case dobro:
|
||||
return "dobro"
|
||||
case yest:
|
||||
return "yest"
|
||||
case zhivete:
|
||||
return "zhivete"
|
||||
case dzelo:
|
||||
return "dzelo"
|
||||
case zemlja:
|
||||
return "zemlja"
|
||||
case izhe:
|
||||
return "izhe"
|
||||
case izhei:
|
||||
return "izhei"
|
||||
case gerv:
|
||||
return "gerv"
|
||||
case kako:
|
||||
return "kako"
|
||||
case ljudi:
|
||||
return "ljudi"
|
||||
case mislete:
|
||||
return "mislete"
|
||||
case nash:
|
||||
return "nash"
|
||||
case on:
|
||||
return "on"
|
||||
case pokoj:
|
||||
return "pokoj"
|
||||
case rtsi:
|
||||
return "rtsi"
|
||||
case slovo:
|
||||
return "slovo"
|
||||
case tverdo:
|
||||
return "tverdo"
|
||||
case uk:
|
||||
return "uk"
|
||||
case fert:
|
||||
return "fert"
|
||||
case kher:
|
||||
return "kher"
|
||||
case oht:
|
||||
return "oht"
|
||||
case shta:
|
||||
return "shta"
|
||||
case tsi:
|
||||
return "tsi"
|
||||
case cherv:
|
||||
return "cherv"
|
||||
case sha:
|
||||
return "sha"
|
||||
case yer:
|
||||
return "yer"
|
||||
case yeri:
|
||||
return "yeri"
|
||||
case yerj:
|
||||
return "yerj"
|
||||
case yat:
|
||||
return "yat"
|
||||
case jo:
|
||||
return "jo"
|
||||
case yu:
|
||||
return "yu"
|
||||
case smallYus:
|
||||
return "small.yus"
|
||||
case smallIotatedYus:
|
||||
return "small.iotated.yus"
|
||||
case bigYus:
|
||||
return "big.yus"
|
||||
case bigIotatedYus:
|
||||
return "big.iotated.yus"
|
||||
case fita:
|
||||
return "fita"
|
||||
case izhitsa:
|
||||
return "izhitsa"
|
||||
if str, found := glagolicLetterToString[l]; found {
|
||||
return str
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
type alphabetContracts map[GlagoliticLetter]util.Uint160
|
||||
|
|
756
pkg/innerring/initialization.go
Normal file
756
pkg/innerring/initialization.go
Normal file
|
@ -0,0 +1,756 @@
|
|||
package innerring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance"
|
||||
cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container"
|
||||
frostfs "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
||||
nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation"
|
||||
addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
|
||||
statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
||||
subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
|
||||
auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
|
||||
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
|
||||
morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager"
|
||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||
controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
|
||||
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
|
||||
util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func (s *Server) initNetmapProcessor(cfg *viper.Viper,
|
||||
cnrClient *container.Client,
|
||||
alphaSync event.Handler,
|
||||
subnetClient *morphsubnet.Client,
|
||||
auditProcessor *audit.Processor,
|
||||
settlementProcessor *settlement.Processor) error {
|
||||
locodeValidator, err := s.newLocodeValidator(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
subnetValidator, err := subnetvalidator.New(
|
||||
subnetvalidator.Prm{
|
||||
SubnetClient: subnetClient,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
netSettings := (*networkSettings)(s.netmapClient)
|
||||
|
||||
var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator
|
||||
netMapCandidateStateValidator.SetNetworkSettings(netSettings)
|
||||
|
||||
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||
Log: s.log,
|
||||
PoolSize: cfg.GetInt("workers.netmap"),
|
||||
NetmapClient: s.netmapClient,
|
||||
EpochTimer: s,
|
||||
EpochState: s,
|
||||
AlphabetState: s,
|
||||
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
|
||||
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
|
||||
ContainerWrapper: cnrClient,
|
||||
HandleAudit: s.onlyActiveEventHandler(
|
||||
auditProcessor.StartAuditHandler(),
|
||||
),
|
||||
NotaryDepositHandler: s.onlyAlphabetEventHandler(
|
||||
s.notaryHandler,
|
||||
),
|
||||
AuditSettlementsHandler: s.onlyAlphabetEventHandler(
|
||||
settlementProcessor.HandleAuditEvent,
|
||||
),
|
||||
AlphabetSyncHandler: alphaSync,
|
||||
NodeValidator: nodevalidator.New(
|
||||
&netMapCandidateStateValidator,
|
||||
addrvalidator.New(),
|
||||
locodeValidator,
|
||||
subnetValidator,
|
||||
),
|
||||
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||
SubnetContract: &s.contracts.subnet,
|
||||
|
||||
NodeStateSettings: netSettings,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindMorphProcessor(s.netmapProcessor, s)
|
||||
}
|
||||
|
||||
func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *chainParams, errChan chan<- error) error {
|
||||
s.withoutMainNet = cfg.GetBool("without_mainnet")
|
||||
if s.withoutMainNet {
|
||||
// This works as long as event Listener starts listening loop once,
|
||||
// otherwise Server.Start will run two similar routines.
|
||||
// This behavior most likely will not change.
|
||||
s.mainnetListener = s.morphListener
|
||||
s.mainnetClient = s.morphClient
|
||||
return nil
|
||||
}
|
||||
|
||||
mainnetChain := morphChain
|
||||
mainnetChain.name = mainnetPrefix
|
||||
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
|
||||
|
||||
fromMainChainBlock, err := s.persistate.UInt32(persistateMainChainLastBlockKey)
|
||||
if err != nil {
|
||||
fromMainChainBlock = 0
|
||||
s.log.Warn("can't get last processed main chain block number", zap.String("error", err.Error()))
|
||||
}
|
||||
mainnetChain.from = fromMainChainBlock
|
||||
|
||||
// create mainnet client
|
||||
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create mainnet listener
|
||||
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) enableNotarySupport() error {
|
||||
if !s.sideNotaryConfig.disabled {
|
||||
// enable notary support in the side client
|
||||
err := s.morphClient.EnableNotarySupport(
|
||||
client.WithProxyContract(s.contracts.proxy),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not enable side chain notary support: %w", err)
|
||||
}
|
||||
|
||||
s.morphListener.EnableNotarySupport(s.contracts.proxy, s.morphClient.Committee, s.morphClient)
|
||||
}
|
||||
|
||||
if !s.mainNotaryConfig.disabled {
|
||||
// enable notary support in the main client
|
||||
err := s.mainnetClient.EnableNotarySupport(
|
||||
client.WithProxyContract(s.contracts.processing),
|
||||
client.WithAlphabetSource(s.morphClient.Committee),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not enable main chain notary support: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initNotaryConfig(cfg *viper.Viper) {
|
||||
s.mainNotaryConfig, s.sideNotaryConfig = notaryConfigs(
|
||||
s.morphClient.ProbeNotary(),
|
||||
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
||||
)
|
||||
|
||||
s.log.Info("notary support",
|
||||
zap.Bool("sidechain_enabled", !s.sideNotaryConfig.disabled),
|
||||
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *Server) createAuditProcessor(cfg *viper.Viper, clientCache *ClientCache, cnrClient *container.Client) (*audit.Processor, error) {
|
||||
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
|
||||
porPoolSize := cfg.GetInt("audit.por.pool_size")
|
||||
|
||||
// create audit processor dependencies
|
||||
auditTaskManager := audittask.New(
|
||||
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
|
||||
audittask.WithWorkerPool(auditPool),
|
||||
audittask.WithLogger(s.log),
|
||||
audittask.WithContainerCommunicator(clientCache),
|
||||
audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")),
|
||||
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
||||
return ants.NewPool(pdpPoolSize)
|
||||
}),
|
||||
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
||||
return ants.NewPool(porPoolSize)
|
||||
}),
|
||||
)
|
||||
|
||||
s.workers = append(s.workers, auditTaskManager.Listen)
|
||||
|
||||
// create audit processor
|
||||
return audit.New(&audit.Params{
|
||||
Log: s.log,
|
||||
NetmapClient: s.netmapClient,
|
||||
ContainerClient: cnrClient,
|
||||
IRList: s,
|
||||
EpochSource: s,
|
||||
SGSource: clientCache,
|
||||
Key: &s.key.PrivateKey,
|
||||
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
|
||||
TaskManager: auditTaskManager,
|
||||
Reporter: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) createSettlementProcessor(clientCache *ClientCache, cnrClient *container.Client) *settlement.Processor {
|
||||
// create settlement processor dependencies
|
||||
settlementDeps := settlementDeps{
|
||||
log: s.log,
|
||||
cnrSrc: cntClient.AsContainerSource(cnrClient),
|
||||
auditClient: s.auditClient,
|
||||
nmClient: s.netmapClient,
|
||||
clientCache: clientCache,
|
||||
balanceClient: s.balanceClient,
|
||||
}
|
||||
|
||||
settlementDeps.settlementCtx = auditSettlementContext
|
||||
auditCalcDeps := &auditSettlementDeps{
|
||||
settlementDeps: settlementDeps,
|
||||
}
|
||||
|
||||
settlementDeps.settlementCtx = basicIncomeSettlementContext
|
||||
basicSettlementDeps := &basicIncomeSettlementDeps{
|
||||
settlementDeps: settlementDeps,
|
||||
cnrClient: cnrClient,
|
||||
}
|
||||
|
||||
auditSettlementCalc := auditSettlement.NewCalculator(
|
||||
&auditSettlement.CalculatorPrm{
|
||||
ResultStorage: auditCalcDeps,
|
||||
ContainerStorage: auditCalcDeps,
|
||||
PlacementCalculator: auditCalcDeps,
|
||||
SGStorage: auditCalcDeps,
|
||||
AccountStorage: auditCalcDeps,
|
||||
Exchanger: auditCalcDeps,
|
||||
AuditFeeFetcher: s.netmapClient,
|
||||
},
|
||||
auditSettlement.WithLogger(s.log),
|
||||
)
|
||||
|
||||
// create settlement processor
|
||||
return settlement.New(
|
||||
settlement.Prm{
|
||||
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
|
||||
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
|
||||
State: s,
|
||||
},
|
||||
settlement.WithLogger(s.log),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) {
|
||||
var alphaSync event.Handler
|
||||
|
||||
if s.withoutMainNet || cfg.GetBool("governance.disable") {
|
||||
alphaSync = func(event.Event) {
|
||||
s.log.Debug("alphabet keys sync is disabled")
|
||||
}
|
||||
} else {
|
||||
// create governance processor
|
||||
governanceProcessor, err := governance.New(&governance.Params{
|
||||
Log: s.log,
|
||||
FrostFSClient: frostfsCli,
|
||||
NetmapClient: s.netmapClient,
|
||||
AlphabetState: s,
|
||||
EpochState: s,
|
||||
Voter: s,
|
||||
IRFetcher: irf,
|
||||
MorphClient: s.morphClient,
|
||||
MainnetClient: s.mainnetClient,
|
||||
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
alphaSync = governanceProcessor.HandleAlphabetSync
|
||||
err = bindMainnetProcessor(governanceProcessor, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return alphaSync, nil
|
||||
}
|
||||
|
||||
func (s *Server) createIRFetcher() irFetcher {
|
||||
var irf irFetcher
|
||||
|
||||
if s.withoutMainNet || !s.mainNotaryConfig.disabled {
|
||||
// if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs
|
||||
// (naming `...WithNotary` will not always be correct)
|
||||
irf = NewIRFetcherWithNotary(s.morphClient)
|
||||
} else {
|
||||
irf = NewIRFetcherWithoutNotary(s.netmapClient)
|
||||
}
|
||||
|
||||
return irf
|
||||
}
|
||||
|
||||
func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) {
|
||||
s.epochTimer = newEpochTimer(&epochTimerArgs{
|
||||
l: s.log,
|
||||
newEpochHandlers: s.newEpochTickHandlers(),
|
||||
cnrWrapper: morphClients.CnrClient,
|
||||
epoch: s,
|
||||
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
|
||||
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
|
||||
collectBasicIncome: subEpochEventHandler{
|
||||
handler: processors.SettlementProcessor.HandleIncomeCollectionEvent,
|
||||
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
|
||||
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
|
||||
},
|
||||
distributeBasicIncome: subEpochEventHandler{
|
||||
handler: processors.SettlementProcessor.HandleIncomeDistributionEvent,
|
||||
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
|
||||
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
|
||||
},
|
||||
})
|
||||
|
||||
s.addBlockTimer(s.epochTimer)
|
||||
|
||||
// initialize emission timer
|
||||
emissionTimer := newEmissionTimer(&emitTimerArgs{
|
||||
ap: processors.AlphabetProcessor,
|
||||
emitDuration: cfg.GetUint32("timers.emit"),
|
||||
})
|
||||
|
||||
s.addBlockTimer(emissionTimer)
|
||||
}
|
||||
|
||||
func (s *Server) createMorphSubnetClient() (*morphsubnet.Client, error) {
|
||||
// initialize morph client of Subnet contract
|
||||
clientMode := morphsubnet.NotaryAlphabet
|
||||
|
||||
if s.sideNotaryConfig.disabled {
|
||||
clientMode = morphsubnet.NonNotary
|
||||
}
|
||||
|
||||
subnetInitPrm := morphsubnet.InitPrm{}
|
||||
subnetInitPrm.SetBaseClient(s.morphClient)
|
||||
subnetInitPrm.SetContractAddress(s.contracts.subnet)
|
||||
subnetInitPrm.SetMode(clientMode)
|
||||
|
||||
subnetClient := &morphsubnet.Client{}
|
||||
err := subnetClient.Init(subnetInitPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize subnet client: %w", err)
|
||||
}
|
||||
return subnetClient, nil
|
||||
}
|
||||
|
||||
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) {
|
||||
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create alphabet processor
|
||||
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
||||
ParsedWallets: parsedWallets,
|
||||
Log: s.log,
|
||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||
AlphabetContracts: s.contracts.alphabet,
|
||||
NetmapClient: s.netmapClient,
|
||||
MorphClient: s.morphClient,
|
||||
IRList: s,
|
||||
StorageEmission: cfg.GetUint64("emit.storage.amount"),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = bindMorphProcessor(alphabetProcessor, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return alphabetProcessor, nil
|
||||
}
|
||||
|
||||
func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client,
|
||||
frostfsIDClient *frostfsid.Client, subnetClient *morphsubnet.Client) error {
|
||||
// container processor
|
||||
containerProcessor, err := cont.New(&cont.Params{
|
||||
Log: s.log,
|
||||
PoolSize: cfg.GetInt("workers.container"),
|
||||
AlphabetState: s,
|
||||
ContainerClient: cnrClient,
|
||||
FrostFSIDClient: frostfsIDClient,
|
||||
NetworkState: s.netmapClient,
|
||||
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||
SubnetClient: subnetClient,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindMorphProcessor(containerProcessor, s)
|
||||
}
|
||||
|
||||
func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClient.Client) error {
|
||||
// create balance processor
|
||||
balanceProcessor, err := balance.New(&balance.Params{
|
||||
Log: s.log,
|
||||
PoolSize: cfg.GetInt("workers.balance"),
|
||||
FrostFSClient: frostfsCli,
|
||||
BalanceSC: s.contracts.balance,
|
||||
AlphabetState: s,
|
||||
Converter: &s.precision,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindMorphProcessor(balanceProcessor, s)
|
||||
}
|
||||
|
||||
func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *frostfsid.Client) error {
|
||||
if s.withoutMainNet {
|
||||
return nil
|
||||
}
|
||||
|
||||
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
||||
Log: s.log,
|
||||
PoolSize: cfg.GetInt("workers.frostfs"),
|
||||
FrostFSContract: s.contracts.frostfs,
|
||||
FrostFSIDClient: frostfsIDClient,
|
||||
BalanceClient: s.balanceClient,
|
||||
NetmapClient: s.netmapClient,
|
||||
MorphClient: s.morphClient,
|
||||
EpochState: s,
|
||||
AlphabetState: s,
|
||||
Converter: &s.precision,
|
||||
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
|
||||
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
|
||||
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
|
||||
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindMainnetProcessor(frostfsProcessor, s)
|
||||
}
|
||||
|
||||
func (s *Server) initReputationProcessor(cfg *viper.Viper, sidechainFee fixedn.Fixed8) error {
|
||||
repClient, err := repClient.NewFromMorph(s.morphClient, s.contracts.reputation, sidechainFee, repClient.TryNotary(), repClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create reputation processor
|
||||
reputationProcessor, err := reputation.New(&reputation.Params{
|
||||
Log: s.log,
|
||||
PoolSize: cfg.GetInt("workers.reputation"),
|
||||
EpochState: s,
|
||||
AlphabetState: s,
|
||||
ReputationWrapper: repClient,
|
||||
ManagerBuilder: reputationcommon.NewManagerBuilder(
|
||||
reputationcommon.ManagersPrm{
|
||||
NetMapSource: s.netmapClient,
|
||||
},
|
||||
),
|
||||
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindMorphProcessor(reputationProcessor, s)
|
||||
}
|
||||
|
||||
func (s *Server) initGRPCServer(cfg *viper.Viper) error {
|
||||
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
|
||||
if controlSvcEndpoint == "" {
|
||||
s.log.Info("no Control server endpoint specified, service is disabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
|
||||
authKeys := make([][]byte, 0, len(authKeysStr))
|
||||
|
||||
for i := range authKeysStr {
|
||||
key, err := hex.DecodeString(authKeysStr[i])
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse Control authorized key %s: %w",
|
||||
authKeysStr[i],
|
||||
err,
|
||||
)
|
||||
}
|
||||
|
||||
authKeys = append(authKeys, key)
|
||||
}
|
||||
|
||||
var p controlsrv.Prm
|
||||
|
||||
p.SetPrivateKey(*s.key)
|
||||
p.SetHealthChecker(s)
|
||||
|
||||
controlSvc := controlsrv.New(p,
|
||||
controlsrv.WithAllowedKeys(authKeys),
|
||||
)
|
||||
|
||||
grpcControlSrv := grpc.NewServer()
|
||||
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
|
||||
|
||||
s.runners = append(s.runners, func(ch chan<- error) error {
|
||||
lis, err := net.Listen("tcp", controlSvcEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
ch <- grpcControlSrv.Serve(lis)
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
s.registerNoErrCloser(grpcControlSrv.GracefulStop)
|
||||
return nil
|
||||
}
|
||||
|
||||
type serverMorphClients struct {
|
||||
CnrClient *cntClient.Client
|
||||
FrostFSIDClient *frostfsid.Client
|
||||
FrostFSClient *frostfsClient.Client
|
||||
MorphSubnetClient *morphsubnet.Client
|
||||
}
|
||||
|
||||
func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
|
||||
result := &serverMorphClients{}
|
||||
var err error
|
||||
|
||||
fee := s.feeConfig.SideChainFee()
|
||||
// do not use TryNotary() in audit wrapper
|
||||
// audit operations do not require multisignatures
|
||||
s.auditClient, err = auditClient.NewFromMorph(s.morphClient, s.contracts.audit, fee)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// form morph container client's options
|
||||
morphCnrOpts := make([]cntClient.Option, 0, 3)
|
||||
morphCnrOpts = append(morphCnrOpts,
|
||||
cntClient.TryNotary(),
|
||||
cntClient.AsAlphabet(),
|
||||
)
|
||||
|
||||
if s.sideNotaryConfig.disabled {
|
||||
// in non-notary environments we customize fee for named container registration
|
||||
// because it takes much more additional GAS than other operations.
|
||||
morphCnrOpts = append(morphCnrOpts,
|
||||
cntClient.WithCustomFeeForNamedPut(s.feeConfig.NamedContainerRegistrationFee()),
|
||||
)
|
||||
}
|
||||
|
||||
result.CnrClient, err = cntClient.NewFromMorph(s.morphClient, s.contracts.container, fee, morphCnrOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.netmapClient, err = nmClient.NewFromMorph(s.morphClient, s.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.balanceClient, err = balanceClient.NewFromMorph(s.morphClient, s.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.FrostFSIDClient, err = frostfsid.NewFromMorph(s.morphClient, s.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.FrostFSClient, err = frostfsClient.NewFromMorph(s.mainnetClient, s.contracts.frostfs,
|
||||
s.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.MorphSubnetClient, err = s.createMorphSubnetClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type serverProcessors struct {
|
||||
AlphabetProcessor *alphabet.Processor
|
||||
SettlementProcessor *settlement.Processor
|
||||
}
|
||||
|
||||
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
|
||||
result := &serverProcessors{}
|
||||
|
||||
fee := s.feeConfig.SideChainFee()
|
||||
|
||||
irf := s.createIRFetcher()
|
||||
|
||||
s.statusIndex = newInnerRingIndexer(
|
||||
s.morphClient,
|
||||
irf,
|
||||
s.key.PublicKey(),
|
||||
cfg.GetDuration("indexer.cache_timeout"),
|
||||
)
|
||||
|
||||
clientCache := newClientCache(&clientCacheParams{
|
||||
Log: s.log,
|
||||
Key: &s.key.PrivateKey,
|
||||
SGTimeout: cfg.GetDuration("audit.timeout.get"),
|
||||
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
|
||||
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
|
||||
AllowExternal: cfg.GetBool("audit.allow_external"),
|
||||
})
|
||||
|
||||
s.registerNoErrCloser(clientCache.cache.CloseAll)
|
||||
|
||||
// create audit processor
|
||||
auditProcessor, err := s.createAuditProcessor(cfg, clientCache, morphClients.CnrClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.SettlementProcessor = s.createSettlementProcessor(clientCache, morphClients.CnrClient)
|
||||
|
||||
var alphaSync event.Handler
|
||||
alphaSync, err = s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync, morphClients.MorphSubnetClient, auditProcessor, result.SettlementProcessor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient, morphClients.MorphSubnetClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.initReputationProcessor(cfg, fee)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {
|
||||
fromSideChainBlock, err := s.persistate.UInt32(persistateSideChainLastBlockKey)
|
||||
if err != nil {
|
||||
fromSideChainBlock = 0
|
||||
s.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
morphChain := &chainParams{
|
||||
log: s.log,
|
||||
cfg: cfg,
|
||||
key: s.key,
|
||||
name: morphPrefix,
|
||||
from: fromSideChainBlock,
|
||||
}
|
||||
|
||||
// create morph client
|
||||
s.morphClient, err = createClient(ctx, morphChain, errChan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create morph listener
|
||||
s.morphListener, err = createListener(ctx, s.morphClient, morphChain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.morphClient.SetGroupSignerScope(); err != nil {
|
||||
morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
|
||||
}
|
||||
|
||||
return morphChain, nil
|
||||
}
|
||||
|
||||
func (s *Server) initContracts(cfg *viper.Viper) error {
|
||||
var err error
|
||||
// get all script hashes of contracts
|
||||
s.contracts, err = parseContracts(
|
||||
cfg,
|
||||
s.morphClient,
|
||||
s.withoutMainNet,
|
||||
s.mainNotaryConfig.disabled,
|
||||
s.sideNotaryConfig.disabled,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) initKey(cfg *viper.Viper) error {
|
||||
// prepare inner ring node private key
|
||||
acc, err := utilConfig.LoadAccount(
|
||||
cfg.GetString("wallet.path"),
|
||||
cfg.GetString("wallet.address"),
|
||||
cfg.GetString("wallet.password"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("ir: %w", err)
|
||||
}
|
||||
|
||||
s.key = acc.PrivateKey()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initMetrics(cfg *viper.Viper) {
|
||||
if cfg.GetString("prometheus.address") == "" {
|
||||
return
|
||||
}
|
||||
m := metrics.NewInnerRingMetrics()
|
||||
s.metrics = &m
|
||||
}
|
|
@ -2,47 +2,23 @@ package innerring
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
||||
nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation"
|
||||
addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
|
||||
statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
||||
subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
|
||||
auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit"
|
||||
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
|
||||
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
|
||||
morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/timer"
|
||||
audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager"
|
||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||
controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
|
||||
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
|
||||
util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||
|
@ -50,13 +26,10 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -158,8 +131,6 @@ var (
|
|||
)
|
||||
|
||||
// Start runs all event providers.
|
||||
//
|
||||
// nolint: funlen
|
||||
func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
||||
s.setHealthStatus(control.HealthStatus_STARTING)
|
||||
defer func() {
|
||||
|
@ -168,10 +139,9 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
|||
}
|
||||
}()
|
||||
|
||||
for _, starter := range s.starters {
|
||||
if err := starter(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.launchStarters()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.initConfigFromBlockchain()
|
||||
|
@ -179,26 +149,14 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
if !s.mainNotaryConfig.disabled {
|
||||
err = s.initNotary(ctx,
|
||||
s.depositMainNotary,
|
||||
s.awaitMainNotaryDeposit,
|
||||
"waiting to accept main notary deposit",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.initMainNotary(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !s.sideNotaryConfig.disabled {
|
||||
err = s.initNotary(ctx,
|
||||
s.depositSideNotary,
|
||||
s.awaitSideNotaryDeposit,
|
||||
"waiting to accept side notary deposit",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.initSideNotary(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prm := governance.VoteValidatorPrm{}
|
||||
|
@ -212,13 +170,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
|||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
// tick initial epoch
|
||||
initialEpochTicker := timer.NewOneTickTimer(
|
||||
timer.StaticBlockMeter(s.initialEpochTickDelta),
|
||||
func() {
|
||||
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
|
||||
})
|
||||
s.addBlockTimer(initialEpochTicker)
|
||||
s.tickInitialExpoch()
|
||||
|
||||
morphErr := make(chan error)
|
||||
mainnnetErr := make(chan error)
|
||||
|
@ -235,36 +187,11 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
|||
}
|
||||
}()
|
||||
|
||||
s.morphListener.RegisterBlockHandler(func(b *block.Block) {
|
||||
s.log.Debug("new block",
|
||||
zap.Uint32("index", b.Index),
|
||||
)
|
||||
s.registerMorphNewBlockEventHandler()
|
||||
s.registerMainnetNewBlockEventHandler()
|
||||
|
||||
err = s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index)
|
||||
if err != nil {
|
||||
s.log.Warn("can't update persistent state",
|
||||
zap.String("chain", "side"),
|
||||
zap.Uint32("block_index", b.Index))
|
||||
}
|
||||
|
||||
s.tickTimers(b.Index)
|
||||
})
|
||||
|
||||
if !s.withoutMainNet {
|
||||
s.mainnetListener.RegisterBlockHandler(func(b *block.Block) {
|
||||
err = s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index)
|
||||
if err != nil {
|
||||
s.log.Warn("can't update persistent state",
|
||||
zap.String("chain", "main"),
|
||||
zap.Uint32("block_index", b.Index))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for _, runner := range s.runners {
|
||||
if err := runner(intError); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.startRunners(intError); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events
|
||||
|
@ -279,6 +206,85 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) registerMorphNewBlockEventHandler() {
|
||||
s.morphListener.RegisterBlockHandler(func(b *block.Block) {
|
||||
s.log.Debug("new block",
|
||||
zap.Uint32("index", b.Index),
|
||||
)
|
||||
|
||||
err := s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index)
|
||||
if err != nil {
|
||||
s.log.Warn("can't update persistent state",
|
||||
zap.String("chain", "side"),
|
||||
zap.Uint32("block_index", b.Index))
|
||||
}
|
||||
|
||||
s.tickTimers(b.Index)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) registerMainnetNewBlockEventHandler() {
|
||||
if !s.withoutMainNet {
|
||||
s.mainnetListener.RegisterBlockHandler(func(b *block.Block) {
|
||||
err := s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index)
|
||||
if err != nil {
|
||||
s.log.Warn("can't update persistent state",
|
||||
zap.String("chain", "main"),
|
||||
zap.Uint32("block_index", b.Index))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) startRunners(errCh chan<- error) error {
|
||||
for _, runner := range s.runners {
|
||||
if err := runner(errCh); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) launchStarters() error {
|
||||
for _, starter := range s.starters {
|
||||
if err := starter(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initMainNotary(ctx context.Context) error {
|
||||
if !s.mainNotaryConfig.disabled {
|
||||
return s.initNotary(ctx,
|
||||
s.depositMainNotary,
|
||||
s.awaitMainNotaryDeposit,
|
||||
"waiting to accept main notary deposit",
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initSideNotary(ctx context.Context) error {
|
||||
if !s.sideNotaryConfig.disabled {
|
||||
return s.initNotary(ctx,
|
||||
s.depositSideNotary,
|
||||
s.awaitSideNotaryDeposit,
|
||||
"waiting to accept side notary deposit",
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) tickInitialExpoch() {
|
||||
initialEpochTicker := timer.NewOneTickTimer(
|
||||
timer.StaticBlockMeter(s.initialEpochTickDelta),
|
||||
func() {
|
||||
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
|
||||
})
|
||||
s.addBlockTimer(initialEpochTicker)
|
||||
}
|
||||
|
||||
func (s *Server) startWorkers(ctx context.Context) {
|
||||
for _, w := range s.workers {
|
||||
go w(ctx)
|
||||
|
@ -321,8 +327,6 @@ func (s *Server) registerStarter(f func() error) {
|
|||
}
|
||||
|
||||
// New creates instance of inner ring sever structure.
|
||||
//
|
||||
// nolint: funlen, gocognit
|
||||
func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) {
|
||||
var err error
|
||||
server := &Server{log: log}
|
||||
|
@ -332,128 +336,38 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
// parse notary support
|
||||
server.feeConfig = config.NewFeeConfig(cfg)
|
||||
|
||||
// prepare inner ring node private key
|
||||
acc, err := utilConfig.LoadAccount(
|
||||
cfg.GetString("wallet.path"),
|
||||
cfg.GetString("wallet.address"),
|
||||
cfg.GetString("wallet.password"))
|
||||
err = server.initKey(cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ir: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server.key = acc.PrivateKey()
|
||||
|
||||
server.persistate, err = initPersistentStateStorage(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
server.registerCloser(server.persistate.Close)
|
||||
|
||||
fromSideChainBlock, err := server.persistate.UInt32(persistateSideChainLastBlockKey)
|
||||
if err != nil {
|
||||
fromSideChainBlock = 0
|
||||
log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
morphChain := &chainParams{
|
||||
log: log,
|
||||
cfg: cfg,
|
||||
key: server.key,
|
||||
name: morphPrefix,
|
||||
from: fromSideChainBlock,
|
||||
}
|
||||
|
||||
// create morph client
|
||||
server.morphClient, err = createClient(ctx, morphChain, errChan)
|
||||
var morphChain *chainParams
|
||||
morphChain, err = server.initMorph(ctx, cfg, errChan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create morph listener
|
||||
server.morphListener, err = createListener(ctx, server.morphClient, morphChain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := server.morphClient.SetGroupSignerScope(); err != nil {
|
||||
morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
|
||||
}
|
||||
|
||||
server.withoutMainNet = cfg.GetBool("without_mainnet")
|
||||
|
||||
if server.withoutMainNet {
|
||||
// This works as long as event Listener starts listening loop once,
|
||||
// otherwise Server.Start will run two similar routines.
|
||||
// This behavior most likely will not change.
|
||||
server.mainnetListener = server.morphListener
|
||||
server.mainnetClient = server.morphClient
|
||||
} else {
|
||||
mainnetChain := morphChain
|
||||
mainnetChain.name = mainnetPrefix
|
||||
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
|
||||
|
||||
fromMainChainBlock, err := server.persistate.UInt32(persistateMainChainLastBlockKey)
|
||||
if err != nil {
|
||||
fromMainChainBlock = 0
|
||||
log.Warn("can't get last processed main chain block number", zap.String("error", err.Error()))
|
||||
}
|
||||
mainnetChain.from = fromMainChainBlock
|
||||
|
||||
// create mainnet client
|
||||
server.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create mainnet listener
|
||||
server.mainnetListener, err = createListener(ctx, server.mainnetClient, mainnetChain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
server.mainNotaryConfig, server.sideNotaryConfig = notaryConfigs(
|
||||
server.morphClient.ProbeNotary(),
|
||||
!server.withoutMainNet && server.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
||||
)
|
||||
|
||||
log.Info("notary support",
|
||||
zap.Bool("sidechain_enabled", !server.sideNotaryConfig.disabled),
|
||||
zap.Bool("mainchain_enabled", !server.mainNotaryConfig.disabled),
|
||||
)
|
||||
|
||||
// get all script hashes of contracts
|
||||
server.contracts, err = parseContracts(
|
||||
cfg,
|
||||
server.morphClient,
|
||||
server.withoutMainNet,
|
||||
server.mainNotaryConfig.disabled,
|
||||
server.sideNotaryConfig.disabled,
|
||||
)
|
||||
err = server.initMainnet(ctx, cfg, morphChain, errChan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !server.sideNotaryConfig.disabled {
|
||||
// enable notary support in the side client
|
||||
err = server.morphClient.EnableNotarySupport(
|
||||
client.WithProxyContract(server.contracts.proxy),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not enable side chain notary support: %w", err)
|
||||
}
|
||||
server.initNotaryConfig(cfg)
|
||||
|
||||
server.morphListener.EnableNotarySupport(server.contracts.proxy, server.morphClient.Committee, server.morphClient)
|
||||
err = server.initContracts(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !server.mainNotaryConfig.disabled {
|
||||
// enable notary support in the main client
|
||||
err = server.mainnetClient.EnableNotarySupport(
|
||||
client.WithProxyContract(server.contracts.processing),
|
||||
client.WithAlphabetSource(server.morphClient.Committee),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not enable main chain notary support: %w", err)
|
||||
}
|
||||
err = server.enableNotarySupport()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// parse default validators
|
||||
|
@ -464,482 +378,30 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
|
||||
server.pubKey = server.key.PublicKey().Bytes()
|
||||
|
||||
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
|
||||
var morphClients *serverMorphClients
|
||||
morphClients, err = server.initClientsFromMorph()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fee := server.feeConfig.SideChainFee()
|
||||
|
||||
// do not use TryNotary() in audit wrapper
|
||||
// audit operations do not require multisignatures
|
||||
server.auditClient, err = auditClient.NewFromMorph(server.morphClient, server.contracts.audit, fee)
|
||||
var processors *serverProcessors
|
||||
processors, err = server.initProcessors(cfg, morphClients)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// form morph container client's options
|
||||
morphCnrOpts := make([]cntClient.Option, 0, 3)
|
||||
morphCnrOpts = append(morphCnrOpts,
|
||||
cntClient.TryNotary(),
|
||||
cntClient.AsAlphabet(),
|
||||
)
|
||||
server.initTimers(cfg, processors, morphClients)
|
||||
|
||||
if server.sideNotaryConfig.disabled {
|
||||
// in non-notary environments we customize fee for named container registration
|
||||
// because it takes much more additional GAS than other operations.
|
||||
morphCnrOpts = append(morphCnrOpts,
|
||||
cntClient.WithCustomFeeForNamedPut(server.feeConfig.NamedContainerRegistrationFee()),
|
||||
)
|
||||
}
|
||||
|
||||
cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, fee, morphCnrOpts...)
|
||||
err = server.initGRPCServer(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server.netmapClient, err = nmClient.NewFromMorph(server.morphClient, server.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server.balanceClient, err = balanceClient.NewFromMorph(server.morphClient, server.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, fee, repClient.TryNotary(), repClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
frostfsIDClient, err := frostfsid.NewFromMorph(server.morphClient, server.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
frostfsCli, err := frostfsClient.NewFromMorph(server.mainnetClient, server.contracts.frostfs,
|
||||
server.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// initialize morph client of Subnet contract
|
||||
clientMode := morphsubnet.NotaryAlphabet
|
||||
|
||||
if server.sideNotaryConfig.disabled {
|
||||
clientMode = morphsubnet.NonNotary
|
||||
}
|
||||
|
||||
subnetInitPrm := morphsubnet.InitPrm{}
|
||||
subnetInitPrm.SetBaseClient(server.morphClient)
|
||||
subnetInitPrm.SetContractAddress(server.contracts.subnet)
|
||||
subnetInitPrm.SetMode(clientMode)
|
||||
|
||||
subnetClient := &morphsubnet.Client{}
|
||||
err = subnetClient.Init(subnetInitPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize subnet client: %w", err)
|
||||
}
|
||||
|
||||
var irf irFetcher
|
||||
|
||||
if server.withoutMainNet || !server.mainNotaryConfig.disabled {
|
||||
// if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs
|
||||
// (naming `...WithNotary` will not always be correct)
|
||||
irf = NewIRFetcherWithNotary(server.morphClient)
|
||||
} else {
|
||||
irf = NewIRFetcherWithoutNotary(server.netmapClient)
|
||||
}
|
||||
|
||||
server.statusIndex = newInnerRingIndexer(
|
||||
server.morphClient,
|
||||
irf,
|
||||
server.key.PublicKey(),
|
||||
cfg.GetDuration("indexer.cache_timeout"),
|
||||
)
|
||||
|
||||
clientCache := newClientCache(&clientCacheParams{
|
||||
Log: log,
|
||||
Key: &server.key.PrivateKey,
|
||||
SGTimeout: cfg.GetDuration("audit.timeout.get"),
|
||||
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
|
||||
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
|
||||
AllowExternal: cfg.GetBool("audit.allow_external"),
|
||||
})
|
||||
|
||||
server.registerNoErrCloser(clientCache.cache.CloseAll)
|
||||
|
||||
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
|
||||
porPoolSize := cfg.GetInt("audit.por.pool_size")
|
||||
|
||||
// create audit processor dependencies
|
||||
auditTaskManager := audittask.New(
|
||||
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
|
||||
audittask.WithWorkerPool(auditPool),
|
||||
audittask.WithLogger(log),
|
||||
audittask.WithContainerCommunicator(clientCache),
|
||||
audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")),
|
||||
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
||||
return ants.NewPool(pdpPoolSize)
|
||||
}),
|
||||
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
||||
return ants.NewPool(porPoolSize)
|
||||
}),
|
||||
)
|
||||
|
||||
server.workers = append(server.workers, auditTaskManager.Listen)
|
||||
|
||||
// create audit processor
|
||||
auditProcessor, err := audit.New(&audit.Params{
|
||||
Log: log,
|
||||
NetmapClient: server.netmapClient,
|
||||
ContainerClient: cnrClient,
|
||||
IRList: server,
|
||||
EpochSource: server,
|
||||
SGSource: clientCache,
|
||||
Key: &server.key.PrivateKey,
|
||||
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
|
||||
TaskManager: auditTaskManager,
|
||||
Reporter: server,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create settlement processor dependencies
|
||||
settlementDeps := settlementDeps{
|
||||
log: server.log,
|
||||
cnrSrc: cntClient.AsContainerSource(cnrClient),
|
||||
auditClient: server.auditClient,
|
||||
nmClient: server.netmapClient,
|
||||
clientCache: clientCache,
|
||||
balanceClient: server.balanceClient,
|
||||
}
|
||||
|
||||
settlementDeps.settlementCtx = auditSettlementContext
|
||||
auditCalcDeps := &auditSettlementDeps{
|
||||
settlementDeps: settlementDeps,
|
||||
}
|
||||
|
||||
settlementDeps.settlementCtx = basicIncomeSettlementContext
|
||||
basicSettlementDeps := &basicIncomeSettlementDeps{
|
||||
settlementDeps: settlementDeps,
|
||||
cnrClient: cnrClient,
|
||||
}
|
||||
|
||||
auditSettlementCalc := auditSettlement.NewCalculator(
|
||||
&auditSettlement.CalculatorPrm{
|
||||
ResultStorage: auditCalcDeps,
|
||||
ContainerStorage: auditCalcDeps,
|
||||
PlacementCalculator: auditCalcDeps,
|
||||
SGStorage: auditCalcDeps,
|
||||
AccountStorage: auditCalcDeps,
|
||||
Exchanger: auditCalcDeps,
|
||||
AuditFeeFetcher: server.netmapClient,
|
||||
},
|
||||
auditSettlement.WithLogger(server.log),
|
||||
)
|
||||
|
||||
// create settlement processor
|
||||
settlementProcessor := settlement.New(
|
||||
settlement.Prm{
|
||||
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
|
||||
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
|
||||
State: server,
|
||||
},
|
||||
settlement.WithLogger(server.log),
|
||||
)
|
||||
|
||||
locodeValidator, err := server.newLocodeValidator(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
subnetValidator, err := subnetvalidator.New(
|
||||
subnetvalidator.Prm{
|
||||
SubnetClient: subnetClient,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var alphaSync event.Handler
|
||||
|
||||
if server.withoutMainNet || cfg.GetBool("governance.disable") {
|
||||
alphaSync = func(event.Event) {
|
||||
log.Debug("alphabet keys sync is disabled")
|
||||
}
|
||||
} else {
|
||||
// create governance processor
|
||||
governanceProcessor, err := governance.New(&governance.Params{
|
||||
Log: log,
|
||||
FrostFSClient: frostfsCli,
|
||||
NetmapClient: server.netmapClient,
|
||||
AlphabetState: server,
|
||||
EpochState: server,
|
||||
Voter: server,
|
||||
IRFetcher: irf,
|
||||
MorphClient: server.morphClient,
|
||||
MainnetClient: server.mainnetClient,
|
||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
alphaSync = governanceProcessor.HandleAlphabetSync
|
||||
err = bindMainnetProcessor(governanceProcessor, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
netSettings := (*networkSettings)(server.netmapClient)
|
||||
|
||||
var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator
|
||||
netMapCandidateStateValidator.SetNetworkSettings(netSettings)
|
||||
|
||||
// create netmap processor
|
||||
server.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||
Log: log,
|
||||
PoolSize: cfg.GetInt("workers.netmap"),
|
||||
NetmapClient: server.netmapClient,
|
||||
EpochTimer: server,
|
||||
EpochState: server,
|
||||
AlphabetState: server,
|
||||
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
|
||||
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
|
||||
ContainerWrapper: cnrClient,
|
||||
HandleAudit: server.onlyActiveEventHandler(
|
||||
auditProcessor.StartAuditHandler(),
|
||||
),
|
||||
NotaryDepositHandler: server.onlyAlphabetEventHandler(
|
||||
server.notaryHandler,
|
||||
),
|
||||
AuditSettlementsHandler: server.onlyAlphabetEventHandler(
|
||||
settlementProcessor.HandleAuditEvent,
|
||||
),
|
||||
AlphabetSyncHandler: alphaSync,
|
||||
NodeValidator: nodevalidator.New(
|
||||
&netMapCandidateStateValidator,
|
||||
addrvalidator.New(),
|
||||
locodeValidator,
|
||||
subnetValidator,
|
||||
),
|
||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
||||
SubnetContract: &server.contracts.subnet,
|
||||
|
||||
NodeStateSettings: netSettings,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = bindMorphProcessor(server.netmapProcessor, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// container processor
|
||||
containerProcessor, err := container.New(&container.Params{
|
||||
Log: log,
|
||||
PoolSize: cfg.GetInt("workers.container"),
|
||||
AlphabetState: server,
|
||||
ContainerClient: cnrClient,
|
||||
FrostFSIDClient: frostfsIDClient,
|
||||
NetworkState: server.netmapClient,
|
||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
||||
SubnetClient: subnetClient,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = bindMorphProcessor(containerProcessor, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create balance processor
|
||||
balanceProcessor, err := balance.New(&balance.Params{
|
||||
Log: log,
|
||||
PoolSize: cfg.GetInt("workers.balance"),
|
||||
FrostFSClient: frostfsCli,
|
||||
BalanceSC: server.contracts.balance,
|
||||
AlphabetState: server,
|
||||
Converter: &server.precision,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = bindMorphProcessor(balanceProcessor, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !server.withoutMainNet {
|
||||
// create mainnnet frostfs processor
|
||||
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
||||
Log: log,
|
||||
PoolSize: cfg.GetInt("workers.frostfs"),
|
||||
FrostFSContract: server.contracts.frostfs,
|
||||
FrostFSIDClient: frostfsIDClient,
|
||||
BalanceClient: server.balanceClient,
|
||||
NetmapClient: server.netmapClient,
|
||||
MorphClient: server.morphClient,
|
||||
EpochState: server,
|
||||
AlphabetState: server,
|
||||
Converter: &server.precision,
|
||||
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
|
||||
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
|
||||
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
|
||||
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = bindMainnetProcessor(frostfsProcessor, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create alphabet processor
|
||||
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
||||
ParsedWallets: parsedWallets,
|
||||
Log: log,
|
||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||
AlphabetContracts: server.contracts.alphabet,
|
||||
NetmapClient: server.netmapClient,
|
||||
MorphClient: server.morphClient,
|
||||
IRList: server,
|
||||
StorageEmission: cfg.GetUint64("emit.storage.amount"),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = bindMorphProcessor(alphabetProcessor, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create reputation processor
|
||||
reputationProcessor, err := reputation.New(&reputation.Params{
|
||||
Log: log,
|
||||
PoolSize: cfg.GetInt("workers.reputation"),
|
||||
EpochState: server,
|
||||
AlphabetState: server,
|
||||
ReputationWrapper: repClient,
|
||||
ManagerBuilder: reputationcommon.NewManagerBuilder(
|
||||
reputationcommon.ManagersPrm{
|
||||
NetMapSource: server.netmapClient,
|
||||
},
|
||||
),
|
||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = bindMorphProcessor(reputationProcessor, server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// initialize epoch timers
|
||||
server.epochTimer = newEpochTimer(&epochTimerArgs{
|
||||
l: server.log,
|
||||
newEpochHandlers: server.newEpochTickHandlers(),
|
||||
cnrWrapper: cnrClient,
|
||||
epoch: server,
|
||||
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
|
||||
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
|
||||
collectBasicIncome: subEpochEventHandler{
|
||||
handler: settlementProcessor.HandleIncomeCollectionEvent,
|
||||
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
|
||||
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
|
||||
},
|
||||
distributeBasicIncome: subEpochEventHandler{
|
||||
handler: settlementProcessor.HandleIncomeDistributionEvent,
|
||||
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
|
||||
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
|
||||
},
|
||||
})
|
||||
|
||||
server.addBlockTimer(server.epochTimer)
|
||||
|
||||
// initialize emission timer
|
||||
emissionTimer := newEmissionTimer(&emitTimerArgs{
|
||||
ap: alphabetProcessor,
|
||||
emitDuration: cfg.GetUint32("timers.emit"),
|
||||
})
|
||||
|
||||
server.addBlockTimer(emissionTimer)
|
||||
|
||||
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
|
||||
if controlSvcEndpoint != "" {
|
||||
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
|
||||
authKeys := make([][]byte, 0, len(authKeysStr))
|
||||
|
||||
for i := range authKeysStr {
|
||||
key, err := hex.DecodeString(authKeysStr[i])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
|
||||
authKeysStr[i],
|
||||
err,
|
||||
)
|
||||
}
|
||||
|
||||
authKeys = append(authKeys, key)
|
||||
}
|
||||
|
||||
var p controlsrv.Prm
|
||||
|
||||
p.SetPrivateKey(*server.key)
|
||||
p.SetHealthChecker(server)
|
||||
|
||||
controlSvc := controlsrv.New(p,
|
||||
controlsrv.WithAllowedKeys(authKeys),
|
||||
)
|
||||
|
||||
grpcControlSrv := grpc.NewServer()
|
||||
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
|
||||
|
||||
server.runners = append(server.runners, func(ch chan<- error) error {
|
||||
lis, err := net.Listen("tcp", controlSvcEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
ch <- grpcControlSrv.Serve(lis)
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
server.registerNoErrCloser(grpcControlSrv.GracefulStop)
|
||||
} else {
|
||||
log.Info("no Control server endpoint specified, service is disabled")
|
||||
}
|
||||
|
||||
server.initSubnet(subnetConfig{
|
||||
queueSize: cfg.GetUint32("workers.subnet"),
|
||||
})
|
||||
|
||||
if cfg.GetString("prometheus.address") != "" {
|
||||
m := metrics.NewInnerRingMetrics()
|
||||
server.metrics = &m
|
||||
}
|
||||
server.initMetrics(cfg)
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
|
|
@ -35,8 +35,6 @@ func (x *Client) SetPrivateKey(key *ecdsa.PrivateKey) {
|
|||
|
||||
// SearchSGPrm groups parameters of SearchSG operation.
|
||||
type SearchSGPrm struct {
|
||||
contextPrm
|
||||
|
||||
cnrID cid.ID
|
||||
}
|
||||
|
||||
|
@ -60,13 +58,13 @@ var sgFilter = storagegroup.SearchQuery()
|
|||
// SearchSG lists objects of storage group type in the container.
|
||||
//
|
||||
// Returns any error which prevented the operation from completing correctly in error return.
|
||||
func (x Client) SearchSG(prm SearchSGPrm) (*SearchSGRes, error) {
|
||||
func (x Client) SearchSG(ctx context.Context, prm SearchSGPrm) (*SearchSGRes, error) {
|
||||
var cliPrm client.PrmObjectSearch
|
||||
cliPrm.InContainer(prm.cnrID)
|
||||
cliPrm.SetFilters(sgFilter)
|
||||
cliPrm.UseKey(*x.key)
|
||||
|
||||
rdr, err := x.c.ObjectSearchInit(prm.ctx, cliPrm)
|
||||
rdr, err := x.c.ObjectSearchInit(ctx, cliPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init object search: %w", err)
|
||||
}
|
||||
|
@ -119,13 +117,13 @@ func (x GetObjectRes) Object() *object.Object {
|
|||
// GetObject reads the object by address.
|
||||
//
|
||||
// Returns any error which prevented the operation from completing correctly in error return.
|
||||
func (x Client) GetObject(prm GetObjectPrm) (*GetObjectRes, error) {
|
||||
func (x Client) GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) {
|
||||
var cliPrm client.PrmObjectGet
|
||||
cliPrm.FromContainer(prm.objAddr.Container())
|
||||
cliPrm.ByID(prm.objAddr.Object())
|
||||
cliPrm.UseKey(*x.key)
|
||||
|
||||
rdr, err := x.c.ObjectGetInit(prm.ctx, cliPrm)
|
||||
rdr, err := x.c.ObjectGetInit(ctx, cliPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init object search: %w", err)
|
||||
}
|
||||
|
@ -189,7 +187,7 @@ func (x HeadObjectRes) Header() *object.Object {
|
|||
//
|
||||
// Returns any error which prevented the operation from completing correctly in error return.
|
||||
// For raw requests, returns *object.SplitInfoError error if the requested object is virtual.
|
||||
func (x Client) HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) {
|
||||
func (x Client) HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) {
|
||||
var cliPrm client.PrmObjectHead
|
||||
|
||||
if prm.raw {
|
||||
|
@ -204,7 +202,7 @@ func (x Client) HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) {
|
|||
cliPrm.ByID(prm.objAddr.Object())
|
||||
cliPrm.UseKey(*x.key)
|
||||
|
||||
cliRes, err := x.c.ObjectHead(prm.ctx, cliPrm)
|
||||
cliRes, err := x.c.ObjectHead(ctx, cliPrm)
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(cliRes.Status())
|
||||
|
@ -231,10 +229,9 @@ func (x Client) HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) {
|
|||
func GetObjectPayload(ctx context.Context, c Client, addr oid.Address) ([]byte, error) {
|
||||
var prm GetObjectPrm
|
||||
|
||||
prm.SetContext(ctx)
|
||||
prm.SetAddress(addr)
|
||||
|
||||
obj, err := c.GetObject(prm)
|
||||
obj, err := c.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -245,7 +242,6 @@ func GetObjectPayload(ctx context.Context, c Client, addr oid.Address) ([]byte,
|
|||
func headObject(ctx context.Context, c Client, addr oid.Address, raw bool, ttl uint32) (*object.Object, error) {
|
||||
var prm HeadObjectPrm
|
||||
|
||||
prm.SetContext(ctx)
|
||||
prm.SetAddress(addr)
|
||||
prm.SetTTL(ttl)
|
||||
|
||||
|
@ -253,7 +249,7 @@ func headObject(ctx context.Context, c Client, addr oid.Address, raw bool, ttl u
|
|||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
obj, err := c.HeadObject(prm)
|
||||
obj, err := c.HeadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -298,14 +294,14 @@ func (x HashPayloadRangeRes) Hash() []byte {
|
|||
// from the remote server's local storage.
|
||||
//
|
||||
// Returns any error which prevented the operation from completing correctly in error return.
|
||||
func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) {
|
||||
func (x Client) HashPayloadRange(ctx context.Context, prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) {
|
||||
var cliPrm client.PrmObjectHash
|
||||
cliPrm.FromContainer(prm.objAddr.Container())
|
||||
cliPrm.ByID(prm.objAddr.Object())
|
||||
cliPrm.SetRangeList(prm.rng.GetOffset(), prm.rng.GetLength())
|
||||
cliPrm.TillichZemorAlgo()
|
||||
|
||||
cliRes, err := x.c.ObjectHash(prm.ctx, cliPrm)
|
||||
cliRes, err := x.c.ObjectHash(ctx, cliPrm)
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(cliRes.Status())
|
||||
|
@ -331,11 +327,10 @@ func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeR
|
|||
func HashObjectRange(ctx context.Context, c Client, addr oid.Address, rng *object.Range) ([]byte, error) {
|
||||
var prm HashPayloadRangePrm
|
||||
|
||||
prm.SetContext(ctx)
|
||||
prm.SetAddress(addr)
|
||||
prm.SetRange(rng)
|
||||
|
||||
res, err := c.HashPayloadRange(prm)
|
||||
res, err := c.HashPayloadRange(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,21 +1,9 @@
|
|||
package frostfsapiclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// nolint: containedctx
|
||||
type contextPrm struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// SetContext sets context.Context used for network communication.
|
||||
func (x *contextPrm) SetContext(ctx context.Context) {
|
||||
x.ctx = ctx
|
||||
}
|
||||
|
||||
type objectAddressPrm struct {
|
||||
objAddr oid.Address
|
||||
}
|
||||
|
@ -26,6 +14,5 @@ func (x *objectAddressPrm) SetAddress(addr oid.Address) {
|
|||
}
|
||||
|
||||
type getObjectPrm struct {
|
||||
contextPrm
|
||||
objectAddressPrm
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package alphabet
|
|||
import (
|
||||
"crypto/elliptic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"go.uber.org/zap"
|
||||
|
@ -10,7 +11,6 @@ import (
|
|||
|
||||
const emitMethod = "emit"
|
||||
|
||||
// nolint: funlen
|
||||
func (ap *Processor) processEmit() {
|
||||
index := ap.irList.AlphabetIndex()
|
||||
if index < 0 {
|
||||
|
@ -63,6 +63,12 @@ func (ap *Processor) processEmit() {
|
|||
|
||||
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
|
||||
|
||||
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
|
||||
|
||||
ap.transferGasToExtraNodes(extraLen, gasPerNode)
|
||||
}
|
||||
|
||||
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
|
||||
for i := range nmNodes {
|
||||
keyBytes := nmNodes[i].PublicKey()
|
||||
|
||||
|
@ -83,9 +89,11 @@ func (ap *Processor) processEmit() {
|
|||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ap *Processor) transferGasToExtraNodes(extraLen int, gasPerNode fixedn.Fixed8) {
|
||||
if extraLen != 0 {
|
||||
err = ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode)
|
||||
err := ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode)
|
||||
if err != nil {
|
||||
receiversLog := make([]string, extraLen)
|
||||
for i, addr := range ap.parsedWallets {
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// nolint: funlen
|
||||
func (ap *Processor) processStartAudit(epoch uint64) {
|
||||
log := ap.log.With(zap.Uint64("epoch", epoch))
|
||||
|
||||
|
@ -52,6 +51,10 @@ func (ap *Processor) processStartAudit(epoch uint64) {
|
|||
|
||||
pivot := make([]byte, sha256.Size)
|
||||
|
||||
ap.startAuditTasksOnContainers(auditCtx, containers, log, pivot, nm, epoch)
|
||||
}
|
||||
|
||||
func (ap *Processor) startAuditTasksOnContainers(ctx context.Context, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) {
|
||||
for i := range containers {
|
||||
cnr, err := cntClient.Get(ap.containerClient, containers[i]) // get container structure
|
||||
if err != nil {
|
||||
|
@ -104,7 +107,7 @@ func (ap *Processor) processStartAudit(epoch uint64) {
|
|||
epoch: epoch,
|
||||
rep: ap.reporter,
|
||||
}).
|
||||
WithAuditContext(auditCtx).
|
||||
WithAuditContext(ctx).
|
||||
WithContainerID(containers[i]).
|
||||
WithStorageGroupList(storageGroups).
|
||||
WithContainerStructure(cnr.Value).
|
||||
|
|
|
@ -46,8 +46,6 @@ type signatureVerificationData struct {
|
|||
// - v.binPublicKey is a public session key
|
||||
// - session context corresponds to the container and verb in v
|
||||
// - session is "alive"
|
||||
//
|
||||
// nolint: funlen
|
||||
func (cp *Processor) verifySignature(v signatureVerificationData) error {
|
||||
var err error
|
||||
var key frostfsecdsa.PublicKeyRFC6979
|
||||
|
@ -61,45 +59,7 @@ func (cp *Processor) verifySignature(v signatureVerificationData) error {
|
|||
}
|
||||
|
||||
if len(v.binTokenSession) > 0 {
|
||||
var tok session.Container
|
||||
|
||||
err = tok.Unmarshal(v.binTokenSession)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode session token: %w", err)
|
||||
}
|
||||
|
||||
if !tok.VerifySignature() {
|
||||
return errors.New("invalid session token signature")
|
||||
}
|
||||
|
||||
// FIXME(@cthulhu-rider): #1387 check token is signed by container owner, see neofs-sdk-go#233
|
||||
|
||||
if keyProvided && !tok.AssertAuthKey(&key) {
|
||||
return errors.New("signed with a non-session key")
|
||||
}
|
||||
|
||||
if !tok.AssertVerb(v.verb) {
|
||||
return errWrongSessionVerb
|
||||
}
|
||||
|
||||
if v.idContainerSet && !tok.AppliedTo(v.idContainer) {
|
||||
return errWrongCID
|
||||
}
|
||||
|
||||
if !session.IssuedBy(tok, v.ownerContainer) {
|
||||
return errors.New("owner differs with token owner")
|
||||
}
|
||||
|
||||
err = cp.checkTokenLifetime(tok)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check session lifetime: %w", err)
|
||||
}
|
||||
|
||||
if !tok.VerifySessionDataSignature(v.signedData, v.signature) {
|
||||
return errors.New("invalid signature calculated with session key")
|
||||
}
|
||||
|
||||
return nil
|
||||
return cp.verifyByTokenSession(v, &key, keyProvided)
|
||||
}
|
||||
|
||||
if keyProvided {
|
||||
|
@ -145,3 +105,45 @@ func (cp *Processor) checkTokenLifetime(token session.Container) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *Processor) verifyByTokenSession(v signatureVerificationData, key *frostfsecdsa.PublicKeyRFC6979, keyProvided bool) error {
|
||||
var tok session.Container
|
||||
|
||||
err := tok.Unmarshal(v.binTokenSession)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode session token: %w", err)
|
||||
}
|
||||
|
||||
if !tok.VerifySignature() {
|
||||
return errors.New("invalid session token signature")
|
||||
}
|
||||
|
||||
// FIXME(@cthulhu-rider): #1387 check token is signed by container owner, see neofs-sdk-go#233
|
||||
|
||||
if keyProvided && !tok.AssertAuthKey(key) {
|
||||
return errors.New("signed with a non-session key")
|
||||
}
|
||||
|
||||
if !tok.AssertVerb(v.verb) {
|
||||
return errWrongSessionVerb
|
||||
}
|
||||
|
||||
if v.idContainerSet && !tok.AppliedTo(v.idContainer) {
|
||||
return errWrongCID
|
||||
}
|
||||
|
||||
if !session.IssuedBy(tok, v.ownerContainer) {
|
||||
return errors.New("owner differs with token owner")
|
||||
}
|
||||
|
||||
err = cp.checkTokenLifetime(tok)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check session lifetime: %w", err)
|
||||
}
|
||||
|
||||
if !tok.VerifySessionDataSignature(v.signedData, v.signature) {
|
||||
return errors.New("invalid signature calculated with session key")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ const (
|
|||
alphabetUpdateIDPrefix = "AlphabetUpdate"
|
||||
)
|
||||
|
||||
// nolint: funlen
|
||||
func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
|
||||
if !gp.alphabetState.IsAlphabet() {
|
||||
gp.log.Info("non alphabet mode, ignore alphabet sync")
|
||||
|
@ -69,79 +68,13 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
|
|||
}
|
||||
|
||||
// 2. Update NeoFSAlphabet role in the sidechain.
|
||||
innerRing, err := gp.irFetcher.InnerRingKeys()
|
||||
if err != nil {
|
||||
gp.log.Error("can't fetch inner ring list from side chain",
|
||||
zap.String("error", err.Error()))
|
||||
} else {
|
||||
newInnerRing, err := updateInnerRing(innerRing, sidechainAlphabet, newAlphabet)
|
||||
if err != nil {
|
||||
gp.log.Error("can't create new inner ring list with new alphabet keys",
|
||||
zap.String("error", err.Error()))
|
||||
} else {
|
||||
sort.Sort(newInnerRing)
|
||||
gp.updateNeoFSAlphabetRoleInSidechain(sidechainAlphabet, newAlphabet, txHash)
|
||||
|
||||
gp.log.Info("update of the inner ring list",
|
||||
zap.String("before", prettyKeys(innerRing)),
|
||||
zap.String("after", prettyKeys(newInnerRing)),
|
||||
)
|
||||
|
||||
if gp.notaryDisabled {
|
||||
updPrm := nmClient.UpdateIRPrm{}
|
||||
|
||||
updPrm.SetKeys(newInnerRing)
|
||||
updPrm.SetHash(txHash)
|
||||
|
||||
err = gp.netmapClient.UpdateInnerRing(updPrm)
|
||||
} else {
|
||||
updPrm := client.UpdateAlphabetListPrm{}
|
||||
|
||||
updPrm.SetList(newInnerRing)
|
||||
updPrm.SetHash(txHash)
|
||||
|
||||
err = gp.morphClient.UpdateNeoFSAlphabetList(updPrm)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
gp.log.Error("can't update inner ring list with new alphabet keys",
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !gp.notaryDisabled {
|
||||
// 3. Update notary role in the sidechain.
|
||||
|
||||
updPrm := client.UpdateNotaryListPrm{}
|
||||
|
||||
updPrm.SetList(newAlphabet)
|
||||
updPrm.SetHash(txHash)
|
||||
|
||||
err = gp.morphClient.UpdateNotaryList(updPrm)
|
||||
if err != nil {
|
||||
gp.log.Error("can't update list of notary nodes in side chain",
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
// 3. Update notary role in the sidechain.
|
||||
gp.updateNotaryRoleInSidechain(newAlphabet, txHash)
|
||||
|
||||
// 4. Update FrostFS contract in the mainnet.
|
||||
epoch := gp.epochState.EpochCounter()
|
||||
|
||||
buf := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(buf, epoch)
|
||||
|
||||
id := append([]byte(alphabetUpdateIDPrefix), buf...)
|
||||
|
||||
prm := frostfscontract.AlphabetUpdatePrm{}
|
||||
|
||||
prm.SetID(id)
|
||||
prm.SetPubs(newAlphabet)
|
||||
|
||||
err = gp.frostfsClient.AlphabetUpdate(prm)
|
||||
if err != nil {
|
||||
gp.log.Error("can't update list of alphabet nodes in frostfs contract",
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
gp.updateFrostFSContractInMainnet(newAlphabet)
|
||||
|
||||
gp.log.Info("finished alphabet list update")
|
||||
}
|
||||
|
@ -157,3 +90,84 @@ func prettyKeys(keys keys.PublicKeys) string {
|
|||
|
||||
return strings.TrimRight(sb.String(), delimiter)
|
||||
}
|
||||
|
||||
func (gp *Processor) updateNeoFSAlphabetRoleInSidechain(sidechainAlphabet, newAlphabet keys.PublicKeys, txHash util.Uint256) {
|
||||
innerRing, err := gp.irFetcher.InnerRingKeys()
|
||||
if err != nil {
|
||||
gp.log.Error("can't fetch inner ring list from side chain",
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
newInnerRing, err := updateInnerRing(innerRing, sidechainAlphabet, newAlphabet)
|
||||
if err != nil {
|
||||
gp.log.Error("can't create new inner ring list with new alphabet keys",
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
sort.Sort(newInnerRing)
|
||||
|
||||
gp.log.Info("update of the inner ring list",
|
||||
zap.String("before", prettyKeys(innerRing)),
|
||||
zap.String("after", prettyKeys(newInnerRing)),
|
||||
)
|
||||
|
||||
if gp.notaryDisabled {
|
||||
updPrm := nmClient.UpdateIRPrm{}
|
||||
|
||||
updPrm.SetKeys(newInnerRing)
|
||||
updPrm.SetHash(txHash)
|
||||
|
||||
err = gp.netmapClient.UpdateInnerRing(updPrm)
|
||||
} else {
|
||||
updPrm := client.UpdateAlphabetListPrm{}
|
||||
|
||||
updPrm.SetList(newInnerRing)
|
||||
updPrm.SetHash(txHash)
|
||||
|
||||
err = gp.morphClient.UpdateNeoFSAlphabetList(updPrm)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
gp.log.Error("can't update inner ring list with new alphabet keys",
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
func (gp *Processor) updateNotaryRoleInSidechain(newAlphabet keys.PublicKeys, txHash util.Uint256) {
|
||||
if gp.notaryDisabled {
|
||||
return
|
||||
}
|
||||
|
||||
updPrm := client.UpdateNotaryListPrm{}
|
||||
|
||||
updPrm.SetList(newAlphabet)
|
||||
updPrm.SetHash(txHash)
|
||||
|
||||
err := gp.morphClient.UpdateNotaryList(updPrm)
|
||||
if err != nil {
|
||||
gp.log.Error("can't update list of notary nodes in side chain",
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
func (gp *Processor) updateFrostFSContractInMainnet(newAlphabet keys.PublicKeys) {
|
||||
epoch := gp.epochState.EpochCounter()
|
||||
|
||||
buf := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(buf, epoch)
|
||||
|
||||
id := append([]byte(alphabetUpdateIDPrefix), buf...)
|
||||
|
||||
prm := frostfscontract.AlphabetUpdatePrm{}
|
||||
|
||||
prm.SetID(id)
|
||||
prm.SetPubs(newAlphabet)
|
||||
|
||||
err := gp.frostfsClient.AlphabetUpdate(prm)
|
||||
if err != nil {
|
||||
gp.log.Error("can't update list of alphabet nodes in frostfs contract",
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,12 +100,11 @@ func (c *ClientCache) getSG(ctx context.Context, addr oid.Address, nm *netmap.Ne
|
|||
continue
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithTimeout(ctx, c.sgTimeout)
|
||||
getObjPrm.SetContext(cctx)
|
||||
ctx, cancel := context.WithTimeout(ctx, c.sgTimeout)
|
||||
|
||||
// NOTE: we use the function which does not verify object integrity (checksums, signature),
|
||||
// but it would be useful to do as part of a data audit.
|
||||
res, err := cli.GetObject(getObjPrm)
|
||||
res, err := cli.GetObject(ctx, getObjPrm)
|
||||
|
||||
cancel()
|
||||
|
||||
|
@ -223,10 +222,9 @@ func (c ClientCache) ListSG(dst *storagegroup2.SearchSGDst, prm storagegroup2.Se
|
|||
|
||||
var cliPrm frostfsapiclient.SearchSGPrm
|
||||
|
||||
cliPrm.SetContext(prm.Context)
|
||||
cliPrm.SetContainerID(prm.Container)
|
||||
|
||||
res, err := cli.SearchSG(cliPrm)
|
||||
res, err := cli.SearchSG(prm.Context, cliPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue