forked from TrueCloudLab/frostfs-node
[#398] cmd/node: Serve NetmapService.NetworkInfo RPC
Implement `NetworkInfo` calls on full stack of Netmap services. Current epoch is read from node local state, magic number is read via `MagicNumber` call of morph client. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
4c8d29ce46
commit
9073e198b9
7 changed files with 102 additions and 11 deletions
|
@ -3,9 +3,11 @@ package main
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
netmapSDK "github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
|
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||||
netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
|
netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
|
||||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
|
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
|
||||||
netmapTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/netmap/grpc"
|
netmapTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/netmap/grpc"
|
||||||
|
@ -36,14 +38,18 @@ func (s *networkState) setCurrentEpoch(v uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func initNetmapService(c *cfg) {
|
func initNetmapService(c *cfg) {
|
||||||
peerInfo := new(netmap.NodeInfo)
|
peerInfo := new(netmapSDK.NodeInfo)
|
||||||
peerInfo.SetAddress(c.localAddr.String())
|
peerInfo.SetAddress(c.localAddr.String())
|
||||||
peerInfo.SetPublicKey(crypto.MarshalPublicKey(&c.key.PublicKey))
|
peerInfo.SetPublicKey(crypto.MarshalPublicKey(&c.key.PublicKey))
|
||||||
peerInfo.SetAttributes(c.cfgNodeInfo.attributes...)
|
peerInfo.SetAttributes(c.cfgNodeInfo.attributes...)
|
||||||
peerInfo.SetState(netmap.NodeStateOffline)
|
peerInfo.SetState(netmapSDK.NodeStateOffline)
|
||||||
|
|
||||||
c.handleLocalNodeInfo(peerInfo)
|
c.handleLocalNodeInfo(peerInfo)
|
||||||
|
|
||||||
|
if c.cfgMorph.client == nil {
|
||||||
|
initMorphComponents(c)
|
||||||
|
}
|
||||||
|
|
||||||
netmapGRPC.RegisterNetmapServiceServer(c.cfgGRPC.server,
|
netmapGRPC.RegisterNetmapServiceServer(c.cfgGRPC.server,
|
||||||
netmapTransportGRPC.New(
|
netmapTransportGRPC.New(
|
||||||
netmapService.NewSignService(
|
netmapService.NewSignService(
|
||||||
|
@ -52,6 +58,10 @@ func initNetmapService(c *cfg) {
|
||||||
netmapService.NewExecutionService(
|
netmapService.NewExecutionService(
|
||||||
c,
|
c,
|
||||||
c.apiVersion,
|
c.apiVersion,
|
||||||
|
&netInfo{
|
||||||
|
netState: c.cfgNetmap.state,
|
||||||
|
magic: c.cfgMorph.client,
|
||||||
|
},
|
||||||
),
|
),
|
||||||
c.respSvc,
|
c.respSvc,
|
||||||
),
|
),
|
||||||
|
@ -137,7 +147,7 @@ func initState(c *cfg) {
|
||||||
c.cfgNetmap.state.setCurrentEpoch(epoch)
|
c.cfgNetmap.state.setCurrentEpoch(epoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmap.NodeInfo, error) {
|
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
|
||||||
// calculate current network state
|
// calculate current network state
|
||||||
nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(epoch)
|
nm, err := c.cfgNetmap.wrapper.GetNetMapByEpoch(epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -147,7 +157,7 @@ func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmap.NodeInfo, error) {
|
||||||
return c.localNodeInfoFromNetmap(nm), nil
|
return c.localNodeInfoFromNetmap(nm), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) localNodeInfoFromNetmap(nm *netmap.Netmap) *netmap.NodeInfo {
|
func (c *cfg) localNodeInfoFromNetmap(nm *netmapSDK.Netmap) *netmapSDK.NodeInfo {
|
||||||
for _, n := range nm.Nodes {
|
for _, n := range nm.Nodes {
|
||||||
if bytes.Equal(n.PublicKey(), crypto.MarshalPublicKey(&c.key.PublicKey)) {
|
if bytes.Equal(n.PublicKey(), crypto.MarshalPublicKey(&c.key.PublicKey)) {
|
||||||
return n.NodeInfo
|
return n.NodeInfo
|
||||||
|
@ -164,7 +174,7 @@ func addNewEpochNotificationHandler(c *cfg, h event.Handler) {
|
||||||
func goOffline(c *cfg) {
|
func goOffline(c *cfg) {
|
||||||
err := c.cfgNetmap.wrapper.UpdatePeerState(
|
err := c.cfgNetmap.wrapper.UpdatePeerState(
|
||||||
crypto.MarshalPublicKey(&c.key.PublicKey),
|
crypto.MarshalPublicKey(&c.key.PublicKey),
|
||||||
netmap.NodeStateOffline,
|
netmapSDK.NodeStateOffline,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -181,10 +191,10 @@ func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
|
||||||
return c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo())
|
return c.cfgNetmap.wrapper.AddPeer(c.toOnlineLocalNodeInfo())
|
||||||
}
|
}
|
||||||
|
|
||||||
var apiState netmap.NodeState
|
var apiState netmapSDK.NodeState
|
||||||
|
|
||||||
if st == control.NetmapStatus_OFFLINE {
|
if st == control.NetmapStatus_OFFLINE {
|
||||||
apiState = netmap.NodeStateOffline
|
apiState = netmapSDK.NodeStateOffline
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.cfgNetmap.wrapper.UpdatePeerState(
|
return c.cfgNetmap.wrapper.UpdatePeerState(
|
||||||
|
@ -192,3 +202,19 @@ func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
|
||||||
apiState,
|
apiState,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type netInfo struct {
|
||||||
|
netState netmap.State
|
||||||
|
|
||||||
|
magic interface {
|
||||||
|
MagicNumber() uint64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *netInfo) Dump() (*netmapV2.NetworkInfo, error) {
|
||||||
|
ni := new(netmapV2.NetworkInfo)
|
||||||
|
ni.SetCurrentEpoch(n.netState.CurrentEpoch())
|
||||||
|
ni.SetMagicNumber(n.magic.MagicNumber())
|
||||||
|
|
||||||
|
return ni, nil
|
||||||
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -16,7 +16,7 @@ require (
|
||||||
github.com/multiformats/go-multihash v0.0.13 // indirect
|
github.com/multiformats/go-multihash v0.0.13 // indirect
|
||||||
github.com/nspcc-dev/hrw v1.0.9
|
github.com/nspcc-dev/hrw v1.0.9
|
||||||
github.com/nspcc-dev/neo-go v0.93.0
|
github.com/nspcc-dev/neo-go v0.93.0
|
||||||
github.com/nspcc-dev/neofs-api-go v1.23.0
|
github.com/nspcc-dev/neofs-api-go v1.23.1-0.20210219132553-e2b0887be93d
|
||||||
github.com/nspcc-dev/neofs-crypto v0.3.0
|
github.com/nspcc-dev/neofs-crypto v0.3.0
|
||||||
github.com/nspcc-dev/tzhash v1.4.0
|
github.com/nspcc-dev/tzhash v1.4.0
|
||||||
github.com/panjf2000/ants/v2 v2.3.0
|
github.com/panjf2000/ants/v2 v2.3.0
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -32,3 +32,14 @@ func (s *Server) LocalNodeInfo(
|
||||||
|
|
||||||
return netmap.LocalNodeInfoResponseToGRPCMessage(resp), nil
|
return netmap.LocalNodeInfoResponseToGRPCMessage(resp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetworkInfo converts gRPC request message and passes it to internal netmap service.
|
||||||
|
func (s *Server) NetworkInfo(ctx context.Context, req *netmapGRPC.NetworkInfoRequest) (*netmapGRPC.NetworkInfoResponse, error) {
|
||||||
|
resp, err := s.srv.NetworkInfo(ctx, netmap.NetworkInfoRequestFromGRPCMessage(req))
|
||||||
|
if err != nil {
|
||||||
|
// TODO: think about how we transport errors through gRPC
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return netmap.NetworkInfoResponseToGRPCMessage(resp), nil
|
||||||
|
}
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
version *pkg.Version
|
version *pkg.Version
|
||||||
state NodeState
|
state NodeState
|
||||||
|
|
||||||
|
netInfo NetworkInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeState encapsulates information
|
// NodeState encapsulates information
|
||||||
|
@ -20,8 +22,16 @@ type NodeState interface {
|
||||||
LocalNodeInfo() (*netmap.NodeInfo, error)
|
LocalNodeInfo() (*netmap.NodeInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewExecutionService(s NodeState, v *pkg.Version) netmap.Service {
|
// NetworkInfo encapsulates source of the
|
||||||
if s == nil || v == nil {
|
// recent information about the NeoFS network.
|
||||||
|
type NetworkInfo interface {
|
||||||
|
// Must return recent network information.
|
||||||
|
// in NeoFS API v2 NetworkInfo structure.
|
||||||
|
Dump() (*netmap.NetworkInfo, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewExecutionService(s NodeState, v *pkg.Version, netInfo NetworkInfo) netmap.Service {
|
||||||
|
if s == nil || v == nil || netInfo == nil {
|
||||||
// this should never happen, otherwise it programmers bug
|
// this should never happen, otherwise it programmers bug
|
||||||
panic("can't create netmap execution service")
|
panic("can't create netmap execution service")
|
||||||
}
|
}
|
||||||
|
@ -29,6 +39,7 @@ func NewExecutionService(s NodeState, v *pkg.Version) netmap.Service {
|
||||||
return &executorSvc{
|
return &executorSvc{
|
||||||
version: v,
|
version: v,
|
||||||
state: s,
|
state: s,
|
||||||
|
netInfo: netInfo,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,3 +60,20 @@ func (s *executorSvc) LocalNodeInfo(
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *executorSvc) NetworkInfo(
|
||||||
|
_ context.Context,
|
||||||
|
_ *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
|
||||||
|
ni, err := s.netInfo.Dump()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
body := new(netmap.NetworkInfoResponseBody)
|
||||||
|
body.SetNetworkInfo(ni)
|
||||||
|
|
||||||
|
resp := new(netmap.NetworkInfoResponse)
|
||||||
|
resp.SetBody(body)
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
|
@ -35,3 +35,16 @@ func (s *responseService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNo
|
||||||
|
|
||||||
return resp.(*netmap.LocalNodeInfoResponse), nil
|
return resp.(*netmap.LocalNodeInfoResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
|
||||||
|
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*netmap.NetworkInfoResponse), nil
|
||||||
|
}
|
||||||
|
|
|
@ -35,3 +35,16 @@ func (s *signService) LocalNodeInfo(
|
||||||
|
|
||||||
return resp.(*netmap.LocalNodeInfoResponse), nil
|
return resp.(*netmap.LocalNodeInfoResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
|
||||||
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*netmap.NetworkInfoResponse), nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue