package main import ( "context" "errors" "fmt" "time" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient/waiter" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "go.uber.org/zap" ) const ( newEpochNotification = "NewEpoch" ) func (c *cfg) initMorphComponents(ctx context.Context) { c.cfgMorph.guard.Lock() defer c.cfgMorph.guard.Unlock() if c.cfgMorph.initialized { return } initMorphClient(ctx, c) lookupScriptHashesInNNS(c) // smart contract auto negotiation if c.cfgMorph.notaryEnabled { err := c.cfgMorph.client.EnableNotarySupport( client.WithProxyContract( c.cfgMorph.proxyScriptHash, ), ) fatalOnErr(err) } c.log.Info(logs.FrostFSNodeNotarySupport, zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled), ) wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary()) fatalOnErr(err) var netmapSource netmap.Source c.cfgMorph.containerCacheSize = morphconfig.ContainerCacheSize(c.appCfg) c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg) if c.cfgMorph.cacheTTL == 0 { msPerBlock, err := c.cfgMorph.client.MsPerBlock() fatalOnErr(err) c.cfgMorph.cacheTTL = time.Duration(msPerBlock) * time.Millisecond c.log.Debug(logs.FrostFSNodeMorphcacheTTLFetchedFromNetwork, zap.Duration("value", c.cfgMorph.cacheTTL)) } if c.cfgMorph.cacheTTL < 0 { netmapSource = wrap } else { // use RPC node as source of netmap (with caching) netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap) } c.netMapSource = netmapSource c.cfgNetmap.wrapper = wrap c.cfgMorph.initialized = true } func initMorphClient(ctx context.Context, c *cfg) { addresses := morphconfig.RPCEndpoint(c.appCfg) // Morph client stable-sorts endpoints by priority. Shuffle here to randomize // order of endpoints with the same priority. rand.Shuffle(len(addresses), func(i, j int) { addresses[i], addresses[j] = addresses[j], addresses[i] }) cli, err := client.New(ctx, c.key, client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)), client.WithLogger(c.log), client.WithMetrics(c.metricsCollector.MorphClientMetrics()), client.WithEndpoints(addresses...), client.WithConnLostCallback(func() { c.internalErr <- errors.New("morph connection has been lost") }), client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)), client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()), client.WithDialerSource(c.dialerSource), ) if err != nil { c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient, zap.Any("endpoints", addresses), zap.String("error", err.Error()), ) fatalOnErr(err) } c.onShutdown(func() { c.log.Info(logs.FrostFSNodeClosingMorphComponents) cli.Close() }) if err := cli.SetGroupSignerScope(); err != nil { c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err)) } c.cfgMorph.client = cli c.cfgMorph.notaryEnabled = cli.ProbeNotary() } func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) { // skip notary deposit in non-notary environments if !c.cfgMorph.notaryEnabled { return } tx, vub, err := makeNotaryDeposit(c) fatalOnErr(err) if tx.Equals(util.Uint256{}) { // non-error deposit with an empty TX hash means // that the deposit has already been made; no // need to wait it. c.log.Info(logs.FrostFSNodeNotaryDepositHasAlreadyBeenMade) return } err = waitNotaryDeposit(ctx, c, tx, vub) fatalOnErr(err) } func makeNotaryDeposit(c *cfg) (util.Uint256, uint32, error) { const ( // gasMultiplier defines how many times more the notary // balance must be compared to the GAS balance of the node: // notaryBalance = GASBalance * gasMultiplier gasMultiplier = 3 // gasDivisor defines what part of GAS balance (1/gasDivisor) // should be transferred to the notary service gasDivisor = 2 ) depositAmount, err := client.CalculateNotaryDepositAmount(c.cfgMorph.client, gasMultiplier, gasDivisor) if err != nil { return util.Uint256{}, 0, fmt.Errorf("could not calculate notary deposit: %w", err) } return c.cfgMorph.client.DepositEndlessNotary(depositAmount) } var ( errNotaryDepositFail = errors.New("notary deposit tx has faulted") errNotaryDepositTimeout = errors.New("notary deposit tx has not appeared in the network") ) type waiterClient struct { c *client.Client } func (w *waiterClient) Context() context.Context { return context.Background() } func (w *waiterClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { return w.c.GetApplicationLog(hash, trig) } func (w *waiterClient) GetBlockCount() (uint32, error) { return w.c.BlockCount() } func (w *waiterClient) GetVersion() (*result.Version, error) { return w.c.GetVersion() } func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error { w, err := waiter.NewPollingBased(&waiterClient{c: c.cfgMorph.client}) if err != nil { return fmt.Errorf("could not create notary deposit waiter: %w", err) } res, err := w.WaitAny(ctx, vub, tx) if err != nil { if errors.Is(err, waiter.ErrTxNotAccepted) { return errNotaryDepositTimeout } return fmt.Errorf("could not wait for notary deposit persists in chain: %w", err) } if res.Execution.VMState.HasFlag(vmstate.Halt) { c.log.Info(logs.ClientNotaryDepositTransactionWasSuccessfullyPersisted) return nil } return errNotaryDepositFail } func listenMorphNotifications(ctx context.Context, c *cfg) { var ( err error subs subscriber.Subscriber ) fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey) if err != nil { fromSideChainBlock = 0 c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error())) } subs, err = subscriber.New(ctx, &subscriber.Params{ Log: c.log, StartFromBlock: fromSideChainBlock, Client: c.cfgMorph.client, }) fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ Logger: c.log, Subscriber: subs, }) fatalOnErr(err) c.onShutdown(func() { lis.Stop() }) c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) { runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) { lis.ListenWithError(lCtx, c.internalErr) }) })) setNetmapNotificationParser(c, newEpochNotification, func(src *state.ContainedNotificationEvent) (event.Event, error) { res, err := netmapEvent.ParseNewEpoch(src) if err == nil { c.log.Info(logs.FrostFSNodeNewEpochEventFromSidechain, zap.Uint64("number", res.(netmapEvent.NewEpoch).EpochNumber()), ) } return res, err }) registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers) registerNotificationHandlers(c.cfgContainer.scriptHash, lis, c.cfgContainer.parsers, c.cfgContainer.subscribers) registerBlockHandler(lis, func(block *block.Block) { c.log.Debug(logs.FrostFSNodeNewBlock, zap.Uint32("index", block.Index)) err = c.persistate.SetUInt32(persistateSideChainLastBlockKey, block.Index) if err != nil { c.log.Warn(logs.FrostFSNodeCantUpdatePersistentState, zap.String("chain", "side"), zap.Uint32("block_index", block.Index)) } }) } func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.NotificationParser, subs map[event.Type][]event.Handler, ) { for typ, handlers := range subs { pi := event.NotificationParserInfo{} pi.SetType(typ) pi.SetScriptHash(scHash) p, ok := parsers[typ] if !ok { panic(fmt.Sprintf("missing parser for event %s", typ)) } pi.SetParser(p) lis.SetNotificationParser(pi) for _, h := range handlers { hi := event.NotificationHandlerInfo{} hi.SetType(typ) hi.SetScriptHash(scHash) hi.SetHandler(h) lis.RegisterNotificationHandler(hi) } } } func registerBlockHandler(lis event.Listener, handler event.BlockHandler) { lis.RegisterBlockHandler(handler) } // lookupScriptHashesInNNS looks up for contract script hashes in NNS contract of side // chain if they were not specified in config file. func lookupScriptHashesInNNS(c *cfg) { var ( err error emptyHash = util.Uint160{} targets = [...]struct { h *util.Uint160 nnsName string }{ {&c.cfgNetmap.scriptHash, client.NNSNetmapContractName}, {&c.cfgAccounting.scriptHash, client.NNSBalanceContractName}, {&c.cfgContainer.scriptHash, client.NNSContainerContractName}, {&c.cfgFrostfsID.scriptHash, client.NNSFrostFSIDContractName}, {&c.cfgMorph.proxyScriptHash, client.NNSProxyContractName}, {&c.cfgObject.cfgAccessPolicyEngine.policyContractHash, client.NNSPolicyContractName}, } ) for _, t := range targets { if t.nnsName == client.NNSProxyContractName && !c.cfgMorph.notaryEnabled { continue // ignore proxy contract if notary disabled } if emptyHash.Equals(*t.h) { *t.h, err = c.cfgMorph.client.NNSContractAddress(t.nnsName) fatalOnErrDetails(fmt.Sprintf("can't resolve %s in NNS", t.nnsName), err) } } }