forked from TrueCloudLab/frostfs-node
[#603] cmd/node: Separate configuration and netmap node info
Config `NodeInfo` should be used for bootstrap. Separete local node info and netmap one. Return configured `NodeInfo` if structure from netmap is missing. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
5a4c3dfddf
commit
6279b6343f
4 changed files with 60 additions and 84 deletions
|
@ -103,8 +103,6 @@ type cfg struct {
|
|||
|
||||
cfgControlService cfgControlService
|
||||
|
||||
netStatus *atomic.Int32
|
||||
|
||||
healthStatus *atomic.Int32
|
||||
|
||||
closers []func()
|
||||
|
@ -167,10 +165,7 @@ type BootstrapType uint32
|
|||
|
||||
type cfgNodeInfo struct {
|
||||
// values from config
|
||||
|
||||
// values at runtime
|
||||
infoMtx sync.RWMutex
|
||||
info netmap.NodeInfo
|
||||
localInfo netmap.NodeInfo
|
||||
}
|
||||
|
||||
type cfgObject struct {
|
||||
|
@ -307,7 +302,6 @@ func initCfg(path string) *cfg {
|
|||
cfgObject: cfgObject{
|
||||
pool: initObjectPool(appCfg),
|
||||
},
|
||||
netStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)),
|
||||
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
|
||||
cfgReputation: cfgReputation{
|
||||
scriptHash: contractsconfig.Reputation(appCfg),
|
||||
|
@ -454,63 +448,22 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
|||
}
|
||||
|
||||
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
|
||||
ni := c.localNodeInfo()
|
||||
return ni.ToV2(), nil
|
||||
}
|
||||
|
||||
// handleLocalNodeInfo rewrites local node info
|
||||
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
|
||||
c.cfgNodeInfo.infoMtx.Lock()
|
||||
|
||||
ni := c.cfgNetmap.state.getNodeInfo()
|
||||
if ni != nil {
|
||||
c.cfgNodeInfo.info = *ni
|
||||
return ni.ToV2(), nil
|
||||
}
|
||||
|
||||
c.updateStatusWithoutLock(ni)
|
||||
|
||||
c.cfgNodeInfo.infoMtx.Unlock()
|
||||
return c.cfgNodeInfo.localInfo.ToV2(), nil
|
||||
}
|
||||
|
||||
// handleNodeInfoStatus updates node info status without rewriting whole local
|
||||
// node info status
|
||||
func (c *cfg) handleNodeInfoStatus(ni *netmap.NodeInfo) {
|
||||
c.cfgNodeInfo.infoMtx.Lock()
|
||||
|
||||
c.updateStatusWithoutLock(ni)
|
||||
|
||||
c.cfgNodeInfo.infoMtx.Unlock()
|
||||
// handleLocalNodeInfo rewrites local node info from netmap
|
||||
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
|
||||
c.cfgNetmap.state.setNodeInfo(ni)
|
||||
}
|
||||
|
||||
func (c *cfg) localNodeInfo() netmap.NodeInfo {
|
||||
c.cfgNodeInfo.infoMtx.RLock()
|
||||
defer c.cfgNodeInfo.infoMtx.RUnlock()
|
||||
|
||||
return c.cfgNodeInfo.info
|
||||
}
|
||||
|
||||
func (c *cfg) toOnlineLocalNodeInfo() *netmap.NodeInfo {
|
||||
ni := c.localNodeInfo()
|
||||
func (c *cfg) bootstrap() error {
|
||||
ni := c.cfgNodeInfo.localInfo
|
||||
ni.SetState(netmap.NodeStateOnline)
|
||||
|
||||
return &ni
|
||||
}
|
||||
|
||||
func (c *cfg) updateStatusWithoutLock(ni *netmap.NodeInfo) {
|
||||
var nmState netmap.NodeState
|
||||
|
||||
if ni != nil {
|
||||
nmState = ni.State()
|
||||
} else {
|
||||
nmState = netmap.NodeStateOffline
|
||||
c.cfgNodeInfo.info.SetState(nmState)
|
||||
}
|
||||
|
||||
switch nmState {
|
||||
default:
|
||||
c.setNetmapStatus(control.NetmapStatus_STATUS_UNDEFINED)
|
||||
case netmap.NodeStateOnline:
|
||||
c.setNetmapStatus(control.NetmapStatus_ONLINE)
|
||||
case netmap.NodeStateOffline:
|
||||
c.setNetmapStatus(control.NetmapStatus_OFFLINE)
|
||||
}
|
||||
return c.cfgNetmap.wrapper.AddPeer(&ni)
|
||||
}
|
||||
|
|
|
@ -365,25 +365,19 @@ type usedSpaceService struct {
|
|||
}
|
||||
|
||||
func (c *cfg) PublicKey() []byte {
|
||||
ni := c.localNodeInfo()
|
||||
return ni.PublicKey()
|
||||
return nodeKeyFromNetmap(c)
|
||||
}
|
||||
|
||||
func (c *cfg) Address() string {
|
||||
ni := c.localNodeInfo()
|
||||
return ni.Address()
|
||||
return nodeAddressFromNetmap(c)
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) PublicKey() []byte {
|
||||
ni := c.cfg.localNodeInfo()
|
||||
|
||||
return ni.PublicKey()
|
||||
return nodeKeyFromNetmap(c.cfg)
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) Address() string {
|
||||
ni := c.cfg.localNodeInfo()
|
||||
|
||||
return ni.Address()
|
||||
return nodeAddressFromNetmap(c.cfg)
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
|
||||
|
|
|
@ -75,12 +75,8 @@ func initControlService(c *cfg) {
|
|||
}))
|
||||
}
|
||||
|
||||
func (c *cfg) setNetmapStatus(st control.NetmapStatus) {
|
||||
c.netStatus.Store(int32(st))
|
||||
}
|
||||
|
||||
func (c *cfg) NetmapStatus() control.NetmapStatus {
|
||||
return control.NetmapStatus(c.netStatus.Load())
|
||||
return c.cfgNetmap.state.controlNetmapStatus()
|
||||
}
|
||||
|
||||
func (c *cfg) setHealthStatus(st control.HealthStatus) {
|
||||
|
|
|
@ -20,6 +20,10 @@ import (
|
|||
// primary solution of local network state dump.
|
||||
type networkState struct {
|
||||
epoch *atomic.Uint64
|
||||
|
||||
controlNetStatus atomic.Value // control.NetmapStatus
|
||||
|
||||
nodeInfo atomic.Value // *netmapSDK.NodeInfo
|
||||
}
|
||||
|
||||
func newNetworkState() *networkState {
|
||||
|
@ -36,14 +40,44 @@ func (s *networkState) setCurrentEpoch(v uint64) {
|
|||
s.epoch.Store(v)
|
||||
}
|
||||
|
||||
func initNetmapService(c *cfg) {
|
||||
peerInfo := new(netmapSDK.NodeInfo)
|
||||
peerInfo.SetAddress(c.localAddr.String())
|
||||
peerInfo.SetPublicKey(crypto.MarshalPublicKey(&c.key.PublicKey))
|
||||
peerInfo.SetAttributes(parseAttributes(c.appCfg)...)
|
||||
peerInfo.SetState(netmapSDK.NodeStateOffline)
|
||||
func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
|
||||
s.nodeInfo.Store(ni)
|
||||
|
||||
c.handleLocalNodeInfo(peerInfo)
|
||||
var ctrlNetSt control.NetmapStatus
|
||||
|
||||
switch ni.State() {
|
||||
default:
|
||||
ctrlNetSt = control.NetmapStatus_STATUS_UNDEFINED
|
||||
case netmapSDK.NodeStateOnline:
|
||||
ctrlNetSt = control.NetmapStatus_ONLINE
|
||||
case netmapSDK.NodeStateOffline:
|
||||
ctrlNetSt = control.NetmapStatus_OFFLINE
|
||||
}
|
||||
|
||||
s.controlNetStatus.Store(ctrlNetSt)
|
||||
}
|
||||
|
||||
func (s *networkState) controlNetmapStatus() control.NetmapStatus {
|
||||
return s.controlNetStatus.Load().(control.NetmapStatus)
|
||||
}
|
||||
|
||||
func (s *networkState) getNodeInfo() *netmapSDK.NodeInfo {
|
||||
return s.nodeInfo.Load().(*netmapSDK.NodeInfo)
|
||||
}
|
||||
|
||||
func nodeKeyFromNetmap(c *cfg) []byte {
|
||||
return c.cfgNetmap.state.getNodeInfo().PublicKey()
|
||||
}
|
||||
|
||||
func nodeAddressFromNetmap(c *cfg) string {
|
||||
return c.cfgNetmap.state.getNodeInfo().Address()
|
||||
}
|
||||
|
||||
func initNetmapService(c *cfg) {
|
||||
c.cfgNodeInfo.localInfo.SetAddress(c.localAddr.String())
|
||||
c.cfgNodeInfo.localInfo.SetPublicKey(crypto.MarshalPublicKey(&c.key.PublicKey))
|
||||
c.cfgNodeInfo.localInfo.SetAttributes(parseAttributes(c.appCfg)...)
|
||||
c.cfgNodeInfo.localInfo.SetState(netmapSDK.NodeStateOffline)
|
||||
|
||||
if c.cfgMorph.client == nil {
|
||||
initMorphComponents(c)
|
||||
|
@ -82,7 +116,7 @@ func initNetmapService(c *cfg) {
|
|||
const reBootstrapInterval = 2
|
||||
|
||||
if (n-c.cfgNetmap.startEpoch)%reBootstrapInterval == 0 {
|
||||
err := c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo())
|
||||
err := c.bootstrap()
|
||||
if err != nil {
|
||||
c.log.Warn("can't send re-bootstrap tx", zap.Error(err))
|
||||
}
|
||||
|
@ -109,7 +143,7 @@ func initNetmapService(c *cfg) {
|
|||
func bootstrapNode(c *cfg) {
|
||||
initState(c)
|
||||
|
||||
err := c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo())
|
||||
err := c.bootstrap()
|
||||
fatalOnErrDetails("bootstrap error", err)
|
||||
}
|
||||
|
||||
|
@ -140,8 +174,6 @@ func initState(c *cfg) {
|
|||
ni, err := c.netmapLocalNodeState(epoch)
|
||||
fatalOnErrDetails("could not init network state", err)
|
||||
|
||||
c.handleNodeInfoStatus(ni)
|
||||
|
||||
c.log.Info("initial network state",
|
||||
zap.Uint64("epoch", epoch),
|
||||
zap.Stringer("state", ni.State()),
|
||||
|
@ -149,6 +181,7 @@ func initState(c *cfg) {
|
|||
|
||||
c.cfgNetmap.state.setCurrentEpoch(epoch)
|
||||
c.cfgNetmap.startEpoch = epoch
|
||||
c.cfgNetmap.state.setNodeInfo(ni)
|
||||
}
|
||||
|
||||
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
|
||||
|
@ -192,7 +225,7 @@ func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) {
|
|||
func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
|
||||
if st == control.NetmapStatus_ONLINE {
|
||||
c.cfgNetmap.reBoostrapTurnedOff.Store(false)
|
||||
return c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo())
|
||||
return c.bootstrap()
|
||||
}
|
||||
|
||||
var apiState netmapSDK.NodeState
|
||||
|
|
Loading…
Reference in a new issue