package main import ( "bytes" "context" "errors" "fmt" "net" "sync/atomic" netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" netmapTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/netmap/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" netmapService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "go.uber.org/zap" "google.golang.org/grpc" ) // primary solution of local network state dump. type networkState struct { epoch *atomic.Uint64 controlNetStatus atomic.Int32 // control.NetmapStatus nodeInfo atomic.Value // *netmapSDK.NodeInfo metrics *metrics.NodeMetrics } func newNetworkState() *networkState { ns := &networkState{ epoch: new(atomic.Uint64), } ns.controlNetStatus.Store(int32(control.NetmapStatus_STATUS_UNDEFINED)) return ns } func (s *networkState) CurrentEpoch() uint64 { return s.epoch.Load() } func (s *networkState) setCurrentEpoch(v uint64) { s.epoch.Store(v) if s.metrics != nil { s.metrics.SetEpoch(v) } } func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) { ctrlNetSt := control.NetmapStatus_STATUS_UNDEFINED if ni != nil { s.nodeInfo.Store(*ni) switch { case ni.IsOnline(): ctrlNetSt = control.NetmapStatus_ONLINE case ni.IsOffline(): ctrlNetSt = control.NetmapStatus_OFFLINE case ni.IsMaintenance(): ctrlNetSt = control.NetmapStatus_MAINTENANCE } } else { ctrlNetSt = control.NetmapStatus_OFFLINE niRaw := s.nodeInfo.Load() if niRaw != nil { niOld := niRaw.(netmapSDK.NodeInfo) // nil ni means that the node is not included // in the netmap niOld.SetOffline() s.nodeInfo.Store(niOld) } } s.setControlNetmapStatus(ctrlNetSt) } // sets the current node state to the given value. Subsequent cfg.bootstrap // calls will process this value to decide what status node should set in the // network. func (s *networkState) setControlNetmapStatus(st control.NetmapStatus) { s.controlNetStatus.Store(int32(st)) } func (s *networkState) controlNetmapStatus() (res control.NetmapStatus) { return control.NetmapStatus(s.controlNetStatus.Load()) } func (s *networkState) getNodeInfo() (res netmapSDK.NodeInfo, ok bool) { v := s.nodeInfo.Load() if v != nil { res, ok = v.(netmapSDK.NodeInfo) if !ok { panic(fmt.Sprintf("unexpected value in atomic node info state: %T", v)) } } return } func nodeKeyFromNetmap(c *cfg) []byte { ni, ok := c.cfgNetmap.state.getNodeInfo() if ok { return ni.PublicKey() } return nil } func (c *cfg) iterateNetworkAddresses(f func(string) bool) { ni, ok := c.cfgNetmap.state.getNodeInfo() if ok { ni.IterateNetworkEndpoints(f) } } func (c *cfg) addressNum() int { ni, ok := c.cfgNetmap.state.getNodeInfo() if ok { return ni.NumberOfNetworkEndpoints() } return 0 } func initNetmapService(ctx context.Context, c *cfg) { network.WriteToNodeInfo(c.localAddr, &c.cfgNodeInfo.localInfo) c.cfgNodeInfo.localInfo.SetPublicKey(c.key.PublicKey().Bytes()) parseAttributes(c) c.cfgNodeInfo.localInfo.SetOffline() if c.cfgMorph.client == nil { initMorphComponents(ctx, c) } initNetmapState(c) server := netmapTransportGRPC.New( netmapService.NewSignService( &c.key.PrivateKey, netmapService.NewExecutionService( c, c.apiVersion, &netInfo{ netState: c.cfgNetmap.state, magic: c.cfgMorph.client, morphClientNetMap: c.cfgNetmap.wrapper, msPerBlockRdr: c.cfgMorph.client.MsPerBlock, }, c.respSvc, ), ), ) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { netmapGRPC.RegisterNetmapServiceServer(s, server) }) addNewEpochNotificationHandlers(c) } func addNewEpochNotificationHandlers(c *cfg) { addNewEpochNotificationHandler(c, func(ev event.Event) { c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber()) }) addNewEpochAsyncNotificationHandler(c, func(_ event.Event) { if !c.needBootstrap() || c.cfgNetmap.reBoostrapTurnedOff.Load() { // fixes #470 return } if err := c.bootstrap(); err != nil { c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err)) } }) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { e := ev.(netmapEvent.NewEpoch).EpochNumber() ni, err := c.netmapLocalNodeState(e) if err != nil { c.log.Error(logs.FrostFSNodeCouldNotUpdateNodeStateOnNewEpoch, zap.Uint64("epoch", e), zap.String("error", err.Error()), ) return } c.handleLocalNodeInfo(ni) }) if c.cfgMorph.notaryEnabled { addNewEpochAsyncNotificationHandler(c, func(_ event.Event) { _, err := makeNotaryDeposit(c) if err != nil { c.log.Error(logs.FrostFSNodeCouldNotMakeNotaryDeposit, zap.String("error", err.Error()), ) } }) } } // bootstrapNode adds current node to the Network map. // Must be called after initNetmapService. func bootstrapNode(c *cfg) { if c.needBootstrap() { if c.IsMaintenance() { c.log.Info(logs.FrostFSNodeNodeIsUnderMaintenanceSkipInitialBootstrap) return } err := c.bootstrap() fatalOnErrDetails("bootstrap error", err) } } func addNetmapNotificationHandler(c *cfg, sTyp string, h event.Handler) { typ := event.TypeFromString(sTyp) if c.cfgNetmap.subscribers == nil { c.cfgNetmap.subscribers = make(map[event.Type][]event.Handler, 1) } c.cfgNetmap.subscribers[typ] = append(c.cfgNetmap.subscribers[typ], h) } func setNetmapNotificationParser(c *cfg, sTyp string, p event.NotificationParser) { typ := event.TypeFromString(sTyp) if c.cfgNetmap.parsers == nil { c.cfgNetmap.parsers = make(map[event.Type]event.NotificationParser, 1) } c.cfgNetmap.parsers[typ] = p } // initNetmapState inits current Network map state. // Must be called after Morph components initialization. func initNetmapState(c *cfg) { epoch, err := c.cfgNetmap.wrapper.Epoch() fatalOnErrDetails("could not initialize current epoch number", err) var ni *netmapSDK.NodeInfo ni, err = c.netmapInitLocalNodeState(epoch) fatalOnErrDetails("could not init network state", err) stateWord := nodeState(ni) c.log.Info(logs.FrostFSNodeInitialNetworkState, zap.Uint64("epoch", epoch), zap.String("state", stateWord), ) if ni != nil && ni.IsMaintenance() { c.isMaintenance.Store(true) } c.cfgNetmap.state.setCurrentEpoch(epoch) c.cfgNetmap.startEpoch = epoch c.handleLocalNodeInfo(ni) } func nodeState(ni *netmapSDK.NodeInfo) string { if ni != nil { switch { case ni.IsOnline(): return "online" case ni.IsOffline(): return "offline" case ni.IsMaintenance(): return "maintenance" } } return "undefined" } func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) { nmNodes, err := c.cfgNetmap.wrapper.GetCandidates() if err != nil { return nil, err } var candidate *netmapSDK.NodeInfo for i := range nmNodes { if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) { candidate = &nmNodes[i] break } } node, err := c.netmapLocalNodeState(epoch) if err != nil { return nil, err } if candidate == nil { return node, nil } nmState := nodeState(node) candidateState := nodeState(candidate) if nmState != candidateState { // This happens when the node was switched to maintenance without epoch tick. // We expect it to continue staying in maintenance. c.log.Info(logs.CandidateStatusPriority, zap.String("netmap", nmState), zap.String("candidate", candidateState)) } return candidate, nil } func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) { // calculate current network state nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(epoch) if err != nil { return nil, err } c.netMap.Store(*nm) nmNodes := nm.Nodes() for i := range nmNodes { if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) { return &nmNodes[i], nil } } return nil, nil } // addNewEpochNotificationHandler adds handler that will be executed synchronously. func addNewEpochNotificationHandler(c *cfg, h event.Handler) { addNetmapNotificationHandler(c, newEpochNotification, h) } // addNewEpochAsyncNotificationHandler adds handler that will be executed asynchronously via netmap workerPool. func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) { addNetmapNotificationHandler( c, newEpochNotification, event.WorkerPoolHandler( c.cfgNetmap.workerPool, h, c.log, ), ) } var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode") func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error { switch st { default: return fmt.Errorf("unsupported status %v", st) case control.NetmapStatus_MAINTENANCE: return c.setMaintenanceStatus(false) case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE: } c.stopMaintenance() if !c.needBootstrap() { return errRelayBootstrap } if st == control.NetmapStatus_ONLINE { c.cfgNetmap.reBoostrapTurnedOff.Store(false) return bootstrapOnline(c) } c.cfgNetmap.reBoostrapTurnedOff.Store(true) return c.updateNetMapState(func(*nmClient.UpdatePeerPrm) {}) } func (c *cfg) ForceMaintenance() error { return c.setMaintenanceStatus(true) } func (c *cfg) setMaintenanceStatus(force bool) error { netSettings, err := c.cfgNetmap.wrapper.ReadNetworkConfiguration() if err != nil { err = fmt.Errorf("read network settings to check maintenance allowance: %w", err) } else if !netSettings.MaintenanceModeAllowed { err = errors.New("maintenance mode is not allowed by the network") } if err == nil || force { c.startMaintenance() if err == nil { err = c.updateNetMapState((*nmClient.UpdatePeerPrm).SetMaintenance) } if err != nil { return fmt.Errorf("local maintenance is started, but state is not updated in the network: %w", err) } } return err } // calls UpdatePeerState operation of Netmap contract's client for the local node. // State setter is used to specify node state to switch to. func (c *cfg) updateNetMapState(stateSetter func(*nmClient.UpdatePeerPrm)) error { var prm nmClient.UpdatePeerPrm prm.SetKey(c.key.PublicKey().Bytes()) stateSetter(&prm) _, err := c.cfgNetmap.wrapper.UpdatePeerState(prm) return err } type netInfo struct { netState netmap.State magic interface { MagicNumber() (uint64, error) } morphClientNetMap *nmClient.Client msPerBlockRdr func() (int64, error) } func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) { magic, err := n.magic.MagicNumber() if err != nil { return nil, err } var ni netmapSDK.NetworkInfo ni.SetCurrentEpoch(n.netState.CurrentEpoch()) ni.SetMagicNumber(magic) netInfoMorph, err := n.morphClientNetMap.ReadNetworkConfiguration() if err != nil { return nil, fmt.Errorf("read network configuration using netmap contract client: %w", err) } if mjr := ver.Major(); mjr > 2 || mjr == 2 && ver.Minor() > 9 { msPerBlock, err := n.msPerBlockRdr() if err != nil { return nil, fmt.Errorf("ms per block: %w", err) } ni.SetMsPerBlock(msPerBlock) ni.SetMaxObjectSize(netInfoMorph.MaxObjectSize) ni.SetEpochDuration(netInfoMorph.EpochDuration) ni.SetContainerFee(netInfoMorph.ContainerFee) ni.SetNamedContainerFee(netInfoMorph.ContainerAliasFee) ni.SetIRCandidateFee(netInfoMorph.IRCandidateFee) ni.SetWithdrawalFee(netInfoMorph.WithdrawalFee) if netInfoMorph.HomomorphicHashingDisabled { ni.DisableHomomorphicHashing() } if netInfoMorph.MaintenanceModeAllowed { ni.AllowMaintenanceMode() } for i := range netInfoMorph.Raw { ni.SetRawNetworkParameter(netInfoMorph.Raw[i].Name, netInfoMorph.Raw[i].Value) } } return &ni, nil }