forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
e515dd4582
It could be called for every shard on metabase resync concurrently and it is possible to get state with initialized client but not initialized contract hashes. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
481 lines
12 KiB
Go
481 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"sync/atomic"
|
|
|
|
netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
netmapTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/netmap/grpc"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
netmapService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/netmap"
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// primary solution of local network state dump.
|
|
type networkState struct {
|
|
epoch *atomic.Uint64
|
|
|
|
controlNetStatus atomic.Int32 // control.NetmapStatus
|
|
|
|
nodeInfo atomic.Value // netmapSDK.NodeInfo
|
|
|
|
metrics *metrics.NodeMetrics
|
|
}
|
|
|
|
func newNetworkState() *networkState {
|
|
ns := &networkState{
|
|
epoch: new(atomic.Uint64),
|
|
}
|
|
ns.controlNetStatus.Store(int32(control.NetmapStatus_STATUS_UNDEFINED))
|
|
return ns
|
|
}
|
|
|
|
func (s *networkState) CurrentEpoch() uint64 {
|
|
return s.epoch.Load()
|
|
}
|
|
|
|
func (s *networkState) setCurrentEpoch(v uint64) {
|
|
s.epoch.Store(v)
|
|
if s.metrics != nil {
|
|
s.metrics.SetEpoch(v)
|
|
}
|
|
}
|
|
|
|
func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
|
|
ctrlNetSt := control.NetmapStatus_STATUS_UNDEFINED
|
|
|
|
if ni != nil {
|
|
s.nodeInfo.Store(*ni)
|
|
|
|
switch ni.Status() {
|
|
case netmapSDK.Online:
|
|
ctrlNetSt = control.NetmapStatus_ONLINE
|
|
case netmapSDK.Offline:
|
|
ctrlNetSt = control.NetmapStatus_OFFLINE
|
|
case netmapSDK.Maintenance:
|
|
ctrlNetSt = control.NetmapStatus_MAINTENANCE
|
|
case netmapSDK.UnspecifiedState:
|
|
ctrlNetSt = control.NetmapStatus_STATUS_UNDEFINED
|
|
}
|
|
} else {
|
|
ctrlNetSt = control.NetmapStatus_OFFLINE
|
|
|
|
niRaw := s.nodeInfo.Load()
|
|
if niRaw != nil {
|
|
niOld := niRaw.(netmapSDK.NodeInfo)
|
|
|
|
// nil ni means that the node is not included
|
|
// in the netmap
|
|
niOld.SetStatus(netmapSDK.Offline)
|
|
|
|
s.nodeInfo.Store(niOld)
|
|
}
|
|
}
|
|
|
|
s.setControlNetmapStatus(control.NetmapStatus(ctrlNetSt))
|
|
}
|
|
|
|
// sets the current node state to the given value. Subsequent cfg.bootstrap
|
|
// calls will process this value to decide what status node should set in the
|
|
// network.
|
|
func (s *networkState) setControlNetmapStatus(st control.NetmapStatus) {
|
|
s.controlNetStatus.Store(int32(st))
|
|
}
|
|
|
|
func (s *networkState) controlNetmapStatus() (res control.NetmapStatus) {
|
|
return control.NetmapStatus(s.controlNetStatus.Load())
|
|
}
|
|
|
|
func (s *networkState) getNodeInfo() (res netmapSDK.NodeInfo, ok bool) {
|
|
v := s.nodeInfo.Load()
|
|
if v != nil {
|
|
res, ok = v.(netmapSDK.NodeInfo)
|
|
if !ok {
|
|
panic(fmt.Sprintf("unexpected value in atomic node info state: %T", v))
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func nodeKeyFromNetmap(c *cfg) []byte {
|
|
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
|
if ok {
|
|
return ni.PublicKey()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *cfg) iterateNetworkAddresses(f func(string) bool) {
|
|
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
|
if ok {
|
|
ni.IterateNetworkEndpoints(f)
|
|
}
|
|
}
|
|
|
|
func (c *cfg) addressNum() int {
|
|
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
|
if ok {
|
|
return ni.NumberOfNetworkEndpoints()
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
func initNetmapService(ctx context.Context, c *cfg) {
|
|
network.WriteToNodeInfo(c.localAddr, &c.cfgNodeInfo.localInfo)
|
|
c.cfgNodeInfo.localInfo.SetPublicKey(c.key.PublicKey().Bytes())
|
|
parseAttributes(c)
|
|
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
|
|
|
|
c.initMorphComponents(ctx)
|
|
|
|
initNetmapState(c)
|
|
|
|
server := netmapTransportGRPC.New(
|
|
netmapService.NewSignService(
|
|
&c.key.PrivateKey,
|
|
netmapService.NewExecutionService(
|
|
c,
|
|
c.apiVersion,
|
|
&netInfo{
|
|
netState: c.cfgNetmap.state,
|
|
magic: c.cfgMorph.client,
|
|
morphClientNetMap: c.cfgNetmap.wrapper,
|
|
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
|
|
},
|
|
c.respSvc,
|
|
),
|
|
),
|
|
)
|
|
|
|
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
|
netmapGRPC.RegisterNetmapServiceServer(s, server)
|
|
})
|
|
|
|
addNewEpochNotificationHandlers(c)
|
|
}
|
|
|
|
func addNewEpochNotificationHandlers(c *cfg) {
|
|
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
|
c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber())
|
|
})
|
|
|
|
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
|
e := ev.(netmapEvent.NewEpoch).EpochNumber()
|
|
|
|
c.updateContractNodeInfo(e)
|
|
|
|
if !c.needBootstrap() || c.cfgNetmap.reBoostrapTurnedOff.Load() { // fixes #470
|
|
return
|
|
}
|
|
|
|
if err := c.bootstrap(); err != nil {
|
|
c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
|
|
}
|
|
})
|
|
|
|
if c.cfgMorph.notaryEnabled {
|
|
addNewEpochAsyncNotificationHandler(c, func(_ event.Event) {
|
|
_, err := makeNotaryDeposit(c)
|
|
if err != nil {
|
|
c.log.Error(logs.FrostFSNodeCouldNotMakeNotaryDeposit,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// bootstrapNode adds current node to the Network map.
|
|
// Must be called after initNetmapService.
|
|
func bootstrapNode(c *cfg) {
|
|
if c.needBootstrap() {
|
|
if c.IsMaintenance() {
|
|
c.log.Info(logs.FrostFSNodeNodeIsUnderMaintenanceSkipInitialBootstrap)
|
|
return
|
|
}
|
|
err := c.bootstrap()
|
|
fatalOnErrDetails("bootstrap error", err)
|
|
}
|
|
}
|
|
|
|
func addNetmapNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgNetmap.subscribers == nil {
|
|
c.cfgNetmap.subscribers = make(map[event.Type][]event.Handler, 1)
|
|
}
|
|
|
|
c.cfgNetmap.subscribers[typ] = append(c.cfgNetmap.subscribers[typ], h)
|
|
}
|
|
|
|
func setNetmapNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
|
typ := event.TypeFromString(sTyp)
|
|
|
|
if c.cfgNetmap.parsers == nil {
|
|
c.cfgNetmap.parsers = make(map[event.Type]event.NotificationParser, 1)
|
|
}
|
|
|
|
c.cfgNetmap.parsers[typ] = p
|
|
}
|
|
|
|
// initNetmapState inits current Network map state.
|
|
// Must be called after Morph components initialization.
|
|
func initNetmapState(c *cfg) {
|
|
epoch, err := c.cfgNetmap.wrapper.Epoch()
|
|
fatalOnErrDetails("could not initialize current epoch number", err)
|
|
|
|
var ni *netmapSDK.NodeInfo
|
|
ni, err = c.netmapInitLocalNodeState(epoch)
|
|
fatalOnErrDetails("could not init network state", err)
|
|
|
|
stateWord := nodeState(ni)
|
|
|
|
c.log.Info(logs.FrostFSNodeInitialNetworkState,
|
|
zap.Uint64("epoch", epoch),
|
|
zap.String("state", stateWord),
|
|
)
|
|
|
|
if ni != nil && ni.Status().IsMaintenance() {
|
|
c.isMaintenance.Store(true)
|
|
}
|
|
|
|
c.cfgNetmap.state.setCurrentEpoch(epoch)
|
|
c.setContractNodeInfo(ni)
|
|
}
|
|
|
|
func nodeState(ni *netmapSDK.NodeInfo) string {
|
|
if ni != nil {
|
|
switch ni.Status() {
|
|
case netmapSDK.Online:
|
|
return "online"
|
|
case netmapSDK.Offline:
|
|
return "offline"
|
|
case netmapSDK.Maintenance:
|
|
return "maintenance"
|
|
case netmapSDK.UnspecifiedState:
|
|
return "undefined"
|
|
}
|
|
}
|
|
return "undefined"
|
|
}
|
|
|
|
func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
|
|
nmNodes, err := c.cfgNetmap.wrapper.GetCandidates()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var candidate *netmapSDK.NodeInfo
|
|
for i := range nmNodes {
|
|
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
|
|
candidate = &nmNodes[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
node, err := c.netmapLocalNodeState(epoch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if candidate == nil {
|
|
return node, nil
|
|
}
|
|
|
|
nmState := nodeState(node)
|
|
candidateState := nodeState(candidate)
|
|
if nmState != candidateState {
|
|
// This happens when the node was switched to maintenance without epoch tick.
|
|
// We expect it to continue staying in maintenance.
|
|
c.log.Info(logs.CandidateStatusPriority,
|
|
zap.String("netmap", nmState),
|
|
zap.String("candidate", candidateState))
|
|
}
|
|
return candidate, nil
|
|
}
|
|
|
|
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
|
|
// calculate current network state
|
|
nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(epoch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.netMap.Store(*nm)
|
|
|
|
nmNodes := nm.Nodes()
|
|
for i := range nmNodes {
|
|
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
|
|
return &nmNodes[i], nil
|
|
}
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// addNewEpochNotificationHandler adds handler that will be executed synchronously.
|
|
func addNewEpochNotificationHandler(c *cfg, h event.Handler) {
|
|
addNetmapNotificationHandler(c, newEpochNotification, h)
|
|
}
|
|
|
|
// addNewEpochAsyncNotificationHandler adds handler that will be executed asynchronously via netmap workerPool.
|
|
func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) {
|
|
addNetmapNotificationHandler(
|
|
c,
|
|
newEpochNotification,
|
|
event.WorkerPoolHandler(
|
|
c.cfgNetmap.workerPool,
|
|
h,
|
|
c.log,
|
|
),
|
|
)
|
|
}
|
|
|
|
var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode")
|
|
|
|
func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
|
|
switch st {
|
|
default:
|
|
return fmt.Errorf("unsupported status %v", st)
|
|
case control.NetmapStatus_MAINTENANCE:
|
|
return c.setMaintenanceStatus(false)
|
|
case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE:
|
|
}
|
|
|
|
c.stopMaintenance()
|
|
|
|
if !c.needBootstrap() {
|
|
return errRelayBootstrap
|
|
}
|
|
|
|
if st == control.NetmapStatus_ONLINE {
|
|
c.cfgNetmap.reBoostrapTurnedOff.Store(false)
|
|
return bootstrapOnline(c)
|
|
}
|
|
|
|
c.cfgNetmap.reBoostrapTurnedOff.Store(true)
|
|
|
|
return c.updateNetMapState(func(*nmClient.UpdatePeerPrm) {})
|
|
}
|
|
|
|
func (c *cfg) GetNetmapStatus() (control.NetmapStatus, uint64, error) {
|
|
epoch, err := c.netMapSource.Epoch()
|
|
if err != nil {
|
|
return control.NetmapStatus_STATUS_UNDEFINED, 0, fmt.Errorf("failed to get current epoch: %w", err)
|
|
}
|
|
st := c.NetmapStatus()
|
|
return st, epoch, nil
|
|
}
|
|
|
|
func (c *cfg) ForceMaintenance() error {
|
|
return c.setMaintenanceStatus(true)
|
|
}
|
|
|
|
func (c *cfg) setMaintenanceStatus(force bool) error {
|
|
netSettings, err := c.cfgNetmap.wrapper.ReadNetworkConfiguration()
|
|
if err != nil {
|
|
err = fmt.Errorf("read network settings to check maintenance allowance: %w", err)
|
|
} else if !netSettings.MaintenanceModeAllowed {
|
|
err = errors.New("maintenance mode is not allowed by the network")
|
|
}
|
|
|
|
if err == nil || force {
|
|
c.startMaintenance()
|
|
|
|
if err == nil {
|
|
err = c.updateNetMapState((*nmClient.UpdatePeerPrm).SetMaintenance)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("local maintenance is started, but state is not updated in the network: %w", err)
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// calls UpdatePeerState operation of Netmap contract's client for the local node.
|
|
// State setter is used to specify node state to switch to.
|
|
func (c *cfg) updateNetMapState(stateSetter func(*nmClient.UpdatePeerPrm)) error {
|
|
var prm nmClient.UpdatePeerPrm
|
|
prm.SetKey(c.key.PublicKey().Bytes())
|
|
stateSetter(&prm)
|
|
|
|
_, err := c.cfgNetmap.wrapper.UpdatePeerState(prm)
|
|
return err
|
|
}
|
|
|
|
type netInfo struct {
|
|
netState netmap.State
|
|
|
|
magic interface {
|
|
MagicNumber() (uint64, error)
|
|
}
|
|
|
|
morphClientNetMap *nmClient.Client
|
|
|
|
msPerBlockRdr func() (int64, error)
|
|
}
|
|
|
|
func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) {
|
|
magic, err := n.magic.MagicNumber()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var ni netmapSDK.NetworkInfo
|
|
ni.SetCurrentEpoch(n.netState.CurrentEpoch())
|
|
ni.SetMagicNumber(magic)
|
|
|
|
netInfoMorph, err := n.morphClientNetMap.ReadNetworkConfiguration()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read network configuration using netmap contract client: %w", err)
|
|
}
|
|
|
|
if mjr := ver.Major(); mjr > 2 || mjr == 2 && ver.Minor() > 9 {
|
|
msPerBlock, err := n.msPerBlockRdr()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("ms per block: %w", err)
|
|
}
|
|
|
|
ni.SetMsPerBlock(msPerBlock)
|
|
|
|
ni.SetMaxObjectSize(netInfoMorph.MaxObjectSize)
|
|
ni.SetEpochDuration(netInfoMorph.EpochDuration)
|
|
ni.SetContainerFee(netInfoMorph.ContainerFee)
|
|
ni.SetNamedContainerFee(netInfoMorph.ContainerAliasFee)
|
|
ni.SetIRCandidateFee(netInfoMorph.IRCandidateFee)
|
|
ni.SetWithdrawalFee(netInfoMorph.WithdrawalFee)
|
|
|
|
if netInfoMorph.HomomorphicHashingDisabled {
|
|
ni.DisableHomomorphicHashing()
|
|
}
|
|
|
|
if netInfoMorph.MaintenanceModeAllowed {
|
|
ni.AllowMaintenanceMode()
|
|
}
|
|
|
|
for i := range netInfoMorph.Raw {
|
|
ni.SetRawNetworkParameter(netInfoMorph.Raw[i].Name, netInfoMorph.Raw[i].Value)
|
|
}
|
|
}
|
|
|
|
return &ni, nil
|
|
}
|