From 198eb06032569e84e1e743644da26f805226affe Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 25 Jan 2021 11:23:06 +0300 Subject: [PATCH] [#339] cmd/node: Change the way the local NodeInfo structure is stored Implement NodeState interface required by Netmap service. Make a single point of updating the state of the node (for both Netmap and Control services). Protect node info structure from data race. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 48 +++++++++++++++++++++++++++++++++++++++- cmd/neofs-node/netmap.go | 41 ++++++++++++++++------------------ 2 files changed, 66 insertions(+), 23 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index c3706f4f6..3bfb4bfb2 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -15,6 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/pkg/core/container" @@ -244,7 +245,8 @@ type cfgNodeInfo struct { attributes []*netmap.NodeAttribute // values at runtime - info *netmap.NodeInfo + infoMtx sync.RWMutex + info netmap.NodeInfo } type cfgObject struct { @@ -646,3 +648,47 @@ func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) { return pool } + +func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { + ni := c.localNodeInfo() + return ni.ToV2(), nil +} + +func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { + c.cfgNodeInfo.infoMtx.Lock() + + var nmState netmap.NodeState + + if ni != nil { + c.cfgNodeInfo.info = *ni + nmState = ni.State() + } else { + nmState = netmap.NodeStateOffline + c.cfgNodeInfo.info.SetState(nmState) + } + + switch nmState { + default: + c.setNetmapStatus(control.NetmapStatus_STATUS_UNDEFINED) + case netmap.NodeStateOnline: + c.setNetmapStatus(control.NetmapStatus_ONLINE) + case netmap.NodeStateOffline: + c.setNetmapStatus(control.NetmapStatus_OFFLINE) + } + + c.cfgNodeInfo.infoMtx.Unlock() +} + +func (c *cfg) localNodeInfo() netmap.NodeInfo { + c.cfgNodeInfo.infoMtx.RLock() + defer c.cfgNodeInfo.infoMtx.RUnlock() + + return c.cfgNodeInfo.info +} + +func (c *cfg) toOnlineLocalNodeInfo() *netmap.NodeInfo { + ni := c.localNodeInfo() + ni.SetState(netmap.NodeStateOnline) + + return &ni +} diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index f97b63cbd..35e72310f 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -40,8 +40,9 @@ func initNetmapService(c *cfg) { peerInfo.SetAddress(c.localAddr.String()) peerInfo.SetPublicKey(crypto.MarshalPublicKey(&c.key.PublicKey)) peerInfo.SetAttributes(c.cfgNodeInfo.attributes...) + peerInfo.SetState(netmap.NodeStateOffline) - c.cfgNodeInfo.info = peerInfo + c.handleLocalNodeInfo(peerInfo) netmapGRPC.RegisterNetmapServiceServer(c.cfgGRPC.server, netmapTransportGRPC.New( @@ -49,7 +50,7 @@ func initNetmapService(c *cfg) { c.key, netmapService.NewResponseService( netmapService.NewExecutionService( - c.cfgNodeInfo.info.ToV2(), + c, c.apiVersion, ), c.respSvc, @@ -67,7 +68,7 @@ func initNetmapService(c *cfg) { n := ev.(netmapEvent.NewEpoch).EpochNumber() if n%c.cfgNetmap.reBootstrapInterval == 0 { - err := c.cfgNetmap.wrapper.AddPeer(c.cfgNodeInfo.info) + err := c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo()) if err != nil { c.log.Warn("can't send re-bootstrap tx", zap.Error(err)) } @@ -78,9 +79,9 @@ func initNetmapService(c *cfg) { addNewEpochNotificationHandler(c, func(ev event.Event) { e := ev.(netmapEvent.NewEpoch).EpochNumber() - netStatus, err := c.netmapStatus(e) + ni, err := c.netmapLocalNodeState(e) if err != nil { - c.log.Error("could not update network status on new epoch", + c.log.Error("could not update node state on new epoch", zap.Uint64("epoch", e), zap.String("error", err.Error()), ) @@ -88,14 +89,14 @@ func initNetmapService(c *cfg) { return } - c.setNetmapStatus(netStatus) + c.handleLocalNodeInfo(ni) }) } func bootstrapNode(c *cfg) { initState(c) - err := c.cfgNetmap.wrapper.AddPeer(c.cfgNodeInfo.info) + err := c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo()) fatalOnErr(errors.Wrap(err, "bootstrap error")) } @@ -123,41 +124,37 @@ func initState(c *cfg) { epoch, err := c.cfgNetmap.wrapper.Epoch() fatalOnErr(errors.Wrap(err, "could not initialize current epoch number")) - netStatus, err := c.netmapStatus(epoch) - fatalOnErr(errors.Wrap(err, "could not init network status")) + ni, err := c.netmapLocalNodeState(epoch) + fatalOnErr(errors.Wrap(err, "could not init network state")) - c.setNetmapStatus(netStatus) + c.handleLocalNodeInfo(ni) c.log.Info("initial network state", zap.Uint64("epoch", epoch), - zap.Stringer("status", netStatus), + zap.Stringer("state", ni.State()), ) c.cfgNetmap.state.setCurrentEpoch(epoch) } -func (c *cfg) netmapStatus(epoch uint64) (control.NetmapStatus, error) { +func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmap.NodeInfo, error) { // calculate current network state nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(epoch) if err != nil { - return control.NetmapStatus_STATUS_UNDEFINED, err + return nil, err } - if c.inNetmap(nm) { - return control.NetmapStatus_ONLINE, nil - } - - return control.NetmapStatus_OFFLINE, nil + return c.localNodeInfoFromNetmap(nm), nil } -func (c *cfg) inNetmap(nm *netmap.Netmap) bool { +func (c *cfg) localNodeInfoFromNetmap(nm *netmap.Netmap) *netmap.NodeInfo { for _, n := range nm.Nodes { if bytes.Equal(n.PublicKey(), crypto.MarshalPublicKey(&c.key.PublicKey)) { - return true + return n.NodeInfo } } - return false + return nil } func addNewEpochNotificationHandler(c *cfg, h event.Handler) { @@ -181,7 +178,7 @@ func goOffline(c *cfg) { func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error { if st == control.NetmapStatus_ONLINE { - return c.cfgNetmap.wrapper.AddPeer(c.cfgNodeInfo.info) + return c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo()) } var apiState netmap.NodeState