package main import ( "context" "errors" "fmt" "time" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) const ( newEpochNotification = "NewEpoch" ) func (c *cfg) initMorphComponents(ctx context.Context) { c.cfgMorph.guard.Lock() defer c.cfgMorph.guard.Unlock() if c.cfgMorph.initialized { return } initMorphClient(ctx, c) lookupScriptHashesInNNS(c) // smart contract auto negotiation err := c.cfgMorph.client.EnableNotarySupport( client.WithProxyContract( c.cfgMorph.proxyScriptHash, ), ) fatalOnErr(err) c.log.Info(ctx, logs.FrostFSNodeNotarySupport) wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0) fatalOnErr(err) var netmapSource netmap.Source c.cfgMorph.containerCacheSize = morphconfig.ContainerCacheSize(c.appCfg) c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg) if c.cfgMorph.cacheTTL == 0 { msPerBlock, err := c.cfgMorph.client.MsPerBlock() fatalOnErr(err) c.cfgMorph.cacheTTL = time.Duration(msPerBlock) * time.Millisecond c.log.Debug(ctx, logs.FrostFSNodeMorphcacheTTLFetchedFromNetwork, zap.Duration("value", c.cfgMorph.cacheTTL)) } if c.cfgMorph.cacheTTL < 0 { netmapSource = wrap } else { // use RPC node as source of netmap (with caching) netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap) } c.netMapSource = netmapSource c.cfgNetmap.wrapper = wrap c.cfgMorph.initialized = true } func initMorphClient(ctx context.Context, c *cfg) { addresses := morphconfig.RPCEndpoint(c.appCfg) // Morph client stable-sorts endpoints by priority. Shuffle here to randomize // order of endpoints with the same priority. rand.Shuffle(len(addresses), func(i, j int) { addresses[i], addresses[j] = addresses[j], addresses[i] }) cli, err := client.New(ctx, c.key, client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)), client.WithLogger(c.log), client.WithMetrics(c.metricsCollector.MorphClientMetrics()), client.WithEndpoints(addresses...), client.WithConnLostCallback(func() { c.internalErr <- errors.New("morph connection has been lost") }), client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)), client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()), client.WithDialerSource(c.dialerSource), ) if err != nil { c.log.Info(ctx, logs.FrostFSNodeFailedToCreateNeoRPCClient, zap.Any("endpoints", addresses), zap.Error(err), ) fatalOnErr(err) } c.onShutdown(func() { c.log.Info(ctx, logs.FrostFSNodeClosingMorphComponents) cli.Close() }) if err := cli.SetGroupSignerScope(); err != nil { c.log.Info(ctx, logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err)) } c.cfgMorph.client = cli } func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) { tx, vub, err := makeNotaryDeposit(ctx, c) fatalOnErr(err) if tx.Equals(util.Uint256{}) { // non-error deposit with an empty TX hash means // that the deposit has already been made; no // need to wait it. c.log.Info(ctx, logs.FrostFSNodeNotaryDepositHasAlreadyBeenMade) return } err = waitNotaryDeposit(ctx, c, tx, vub) fatalOnErr(err) } func makeNotaryDeposit(ctx context.Context, c *cfg) (util.Uint256, uint32, error) { const ( // gasMultiplier defines how many times more the notary // balance must be compared to the GAS balance of the node: // notaryBalance = GASBalance * gasMultiplier gasMultiplier = 3 // gasDivisor defines what part of GAS balance (1/gasDivisor) // should be transferred to the notary service gasDivisor = 2 ) depositAmount, err := client.CalculateNotaryDepositAmount(c.cfgMorph.client, gasMultiplier, gasDivisor) if err != nil { return util.Uint256{}, 0, fmt.Errorf("could not calculate notary deposit: %w", err) } return c.cfgMorph.client.DepositEndlessNotary(ctx, depositAmount) } func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error { if err := c.cfgMorph.client.WaitTxHalt(ctx, client.InvokeRes{Hash: tx, VUB: vub}); err != nil { return err } c.log.Info(ctx, logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted) return nil } func listenMorphNotifications(ctx context.Context, c *cfg) { var ( err error subs subscriber.Subscriber ) fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey) if err != nil { fromSideChainBlock = 0 c.log.Warn(ctx, logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.Error(err)) } subs, err = subscriber.New(ctx, &subscriber.Params{ Log: c.log, StartFromBlock: fromSideChainBlock, Client: c.cfgMorph.client, }) fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ Logger: c.log, Subscriber: subs, }) fatalOnErr(err) c.onShutdown(func() { lis.Stop() }) c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) { runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) { lis.ListenWithError(lCtx, c.internalErr) }) })) setNetmapNotificationParser(c, newEpochNotification, func(src *state.ContainedNotificationEvent) (event.Event, error) { res, err := netmapEvent.ParseNewEpoch(src) if err == nil { c.log.Info(ctx, logs.FrostFSNodeNewEpochEventFromSidechain, zap.Uint64("number", res.(netmapEvent.NewEpoch).EpochNumber()), ) } return res, err }) registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers) registerNotificationHandlers(c.cfgContainer.scriptHash, lis, c.cfgContainer.parsers, c.cfgContainer.subscribers) registerBlockHandler(lis, func(ctx context.Context, block *block.Block) { c.log.Debug(ctx, logs.FrostFSNodeNewBlock, zap.Uint32("index", block.Index)) err = c.persistate.SetUInt32(persistateSideChainLastBlockKey, block.Index) if err != nil { c.log.Warn(ctx, logs.FrostFSNodeCantUpdatePersistentState, zap.String("chain", "side"), zap.Uint32("block_index", block.Index)) } }) } func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.NotificationParser, subs map[event.Type][]event.Handler, ) { for typ, handlers := range subs { p, ok := parsers[typ] if !ok { panic(fmt.Sprintf("missing parser for event %s", typ)) } lis.RegisterNotificationHandler(event.NotificationHandlerInfo{ Contract: scHash, Type: typ, Parser: p, Handlers: handlers, }) } } func registerBlockHandler(lis event.Listener, handler event.BlockHandler) { lis.RegisterBlockHandler(handler) } // lookupScriptHashesInNNS looks up for contract script hashes in NNS contract of side // chain if they were not specified in config file. func lookupScriptHashesInNNS(c *cfg) { var ( err error emptyHash = util.Uint160{} targets = [...]struct { h *util.Uint160 nnsName string }{ {&c.cfgNetmap.scriptHash, client.NNSNetmapContractName}, {&c.cfgAccounting.scriptHash, client.NNSBalanceContractName}, {&c.cfgContainer.scriptHash, client.NNSContainerContractName}, {&c.cfgFrostfsID.scriptHash, client.NNSFrostFSIDContractName}, {&c.cfgMorph.proxyScriptHash, client.NNSProxyContractName}, {&c.cfgObject.cfgAccessPolicyEngine.policyContractHash, client.NNSPolicyContractName}, } ) for _, t := range targets { if emptyHash.Equals(*t.h) { *t.h, err = c.cfgMorph.client.NNSContractAddress(t.nnsName) fatalOnErrDetails(fmt.Sprintf("can't resolve %s in NNS", t.nnsName), err) } } }