[#1793] node: Serve NetmapService.NetmapSnapshot RPC

There is no more need to serve the same request on Control API.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-09-17 16:37:01 +04:00 committed by fyrchik
parent 59de20fbba
commit 485a5418d2
33 changed files with 176 additions and 289 deletions

View file

@ -4,6 +4,8 @@ Changelog for NeoFS Node
## [Unreleased] - Anmado (안마도, 鞍馬島)
### Added
- Serving `NetmapService.NetmapSnapshot` RPC (#1793)
- `netmap snapshot` command of NeoFS CLI (#1793)
- Changelog updates CI step (#1808)
- Validate storage node configuration before node startup (#1805)
@ -18,10 +20,13 @@ Changelog for NeoFS Node
### Removed
- Remove WIF and NEP2 support in `neofs-cli`'s --wallet flag (#1128)
- Remove --generate-key option in `neofs-cli container delete` (#1692)
- Serving `ControlService.NetmapSnapshot` RPC (#1793)
- `control netmap-snapshot` command of NeoFS CLI (#1793)
### Updated
### Updating from v0.32.0
Replace using the `control netmap-snapshot` command with `netmap snapshot` one in NeoFS CLI.
## [0.32.0] - 2022-09-14 - Pungdo (풍도, 楓島)

View file

@ -272,6 +272,29 @@ func NodeInfo(prm NodeInfoPrm) (res NodeInfoRes, err error) {
return
}
// NetMapSnapshotPrm groups parameters of NetMapSnapshot operation.
type NetMapSnapshotPrm struct {
commonPrm
}
// NetMapSnapshotRes groups the resulting values of NetMapSnapshot operation.
type NetMapSnapshotRes struct {
cliRes *client.ResNetMapSnapshot
}
// NetMap returns current local snapshot of the NeoFS network map.
func (x NetMapSnapshotRes) NetMap() netmap.NetMap {
return x.cliRes.NetMap()
}
// NetMapSnapshot requests current network view of the remote server.
//
// Returns any error which prevented the operation from completing correctly in error return.
func NetMapSnapshot(prm NetMapSnapshotPrm) (res NetMapSnapshotRes, err error) {
res.cliRes, err = prm.cli.NetMapSnapshot(context.Background(), client.PrmNetMapSnapshot{})
return
}
// CreateSessionPrm groups parameters of CreateSession operation.
type CreateSessionPrm struct {
commonPrm

View file

@ -30,7 +30,6 @@ func init() {
healthCheckCmd,
setNetmapStatusCmd,
dropObjectsCmd,
snapshotCmd,
shardsCmd,
synchronizeTreeCmd,
)
@ -38,7 +37,6 @@ func init() {
initControlHealthCheckCmd()
initControlSetNetmapStatusCmd()
initControlDropObjectsCmd()
initControlSnapshotCmd()
initControlShardsCmd()
initControlSynchronizeTreeCmd()
}

View file

@ -1,74 +0,0 @@
package control
import (
"github.com/mr-tron/base58"
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/common"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/commonflags"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/key"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
"github.com/spf13/cobra"
)
const (
netmapSnapshotJSONFlag = commonflags.JSON
)
var snapshotCmd = &cobra.Command{
Use: "netmap-snapshot",
Short: "Get network map snapshot",
Long: "Get network map snapshot",
Run: func(cmd *cobra.Command, args []string) {
pk := key.Get(cmd)
req := new(control.NetmapSnapshotRequest)
req.SetBody(new(control.NetmapSnapshotRequest_Body))
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.NetmapSnapshotResponse
var err error
err = cli.ExecRaw(func(client *rawclient.Client) error {
resp, err = control.NetmapSnapshot(client, req)
return err
})
common.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
isJSON, _ := cmd.Flags().GetBool(netmapSnapshotJSONFlag)
prettyPrintNetmap(cmd, resp.GetBody().GetNetmap(), isJSON)
},
}
func initControlSnapshotCmd() {
commonflags.InitWithoutRPC(snapshotCmd)
flags := snapshotCmd.Flags()
flags.String(controlRPC, controlRPCDefault, controlRPCUsage)
flags.Bool(netmapSnapshotJSONFlag, false, "print netmap structure in JSON format")
}
func prettyPrintNetmap(cmd *cobra.Command, nm *control.Netmap, jsonEncoding bool) {
if jsonEncoding {
common.PrettyPrintJSON(cmd, nm, "netmap")
return
}
cmd.Println("Epoch:", nm.GetEpoch())
for i, node := range nm.GetNodes() {
cmd.Printf("Node %d: %s %s %v\n", i+1,
base58.Encode(node.GetPublicKey()),
node.GetState(),
node.GetAddresses(),
)
for _, attr := range node.GetAttributes() {
cmd.Printf("\t%s: %s\n", attr.GetKey(), attr.GetValue())
}
}
}

View file

@ -22,9 +22,11 @@ func init() {
getEpochCmd,
nodeInfoCmd,
netInfoCmd,
snapshotCmd,
)
initGetEpochCmd()
initNetInfoCmd()
initNodeInfoCmd()
initSnapshotCmd()
}

View file

@ -0,0 +1,63 @@
package netmap
import (
"encoding/hex"
internalclient "github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/client"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/common"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/commonflags"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/key"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/spf13/cobra"
)
var snapshotCmd = &cobra.Command{
Use: "snapshot",
Short: "Request current local snapshot of the network map",
Long: `Request current local snapshot of the network map`,
Run: func(cmd *cobra.Command, args []string) {
p := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, p, commonflags.RPC)
var prm internalclient.NetMapSnapshotPrm
prm.SetClient(cli)
res, err := internalclient.NetMapSnapshot(prm)
common.ExitOnErr(cmd, "rpc error: %w", err)
prettyPrintNetMap(cmd, res.NetMap())
},
}
func initSnapshotCmd() {
commonflags.Init(snapshotCmd)
commonflags.InitAPI(snapshotCmd)
}
func prettyPrintNetMap(cmd *cobra.Command, nm netmap.NetMap) {
cmd.Println("Epoch:", nm.Epoch())
nodes := nm.Nodes()
for i := range nodes {
var strState string
switch {
case nodes[i].IsOnline():
strState = "ONLINE"
case nodes[i].IsOffline():
strState = "OFFLINE"
}
cmd.Printf("Node %d: %s %s ", i+1, hex.EncodeToString(nodes[i].PublicKey()), strState)
netmap.IterateNetworkEndpoints(nodes[i], func(endpoint string) {
cmd.Printf("%s ", endpoint)
})
cmd.Println()
nodes[i].IterateAttributes(func(key, value string) {
cmd.Printf("\t%s: %s\n", key, value)
})
}
}

View file

@ -6,6 +6,7 @@ import (
"net"
"path/filepath"
"sync"
atomicstd "sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -134,6 +135,13 @@ type cfg struct {
persistate *state.PersistentStorage
netMapSource netmapCore.Source
// current network map
netMap atomicstd.Value // type netmap.NetMap
}
func (c *cfg) ProcessCurrentNetMap(f func(netmap.NetMap)) {
f(c.netMap.Load().(netmap.NetMap))
}
type cfgGRPC struct {

View file

@ -286,6 +286,8 @@ func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
return nil, err
}
c.netMap.Store(*nm)
nmNodes := nm.Nodes()
for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {

4
go.mod
View file

@ -17,9 +17,9 @@ require (
github.com/nspcc-dev/hrw v1.0.9
github.com/nspcc-dev/neo-go v0.99.2
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20220809123759-3094d3e0c14b // indirect
github.com/nspcc-dev/neofs-api-go/v2 v2.13.2-0.20220827080658-9e17cdfc7647
github.com/nspcc-dev/neofs-api-go/v2 v2.13.2-0.20220919124434-cf868188ef9c
github.com/nspcc-dev/neofs-contract v0.15.5
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.6.0.20220914073456-f75a5feba335
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.6.0.20220922065107-8e3173eacd23
github.com/nspcc-dev/tzhash v1.6.1
github.com/panjf2000/ants/v2 v2.4.0
github.com/paulmach/orb v0.2.2

BIN
go.sum

Binary file not shown.

View file

@ -28,7 +28,12 @@ func (c *Client) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) {
epochSnapshotMethod, err)
}
return unmarshalNetmap(res, epochSnapshotMethod)
nm, err := unmarshalNetmap(res, epochSnapshotMethod)
if err == nil {
nm.SetEpoch(epoch)
}
return nm, err
}
// GetCandidates receives information list about candidates

View file

@ -52,3 +52,18 @@ func (s *Server) NetworkInfo(ctx context.Context, req *netmapGRPC.NetworkInfoReq
return resp.ToGRPCMessage().(*netmapGRPC.NetworkInfoResponse), nil
}
// NetmapSnapshot converts gRPC request message and passes it to internal netmap service.
func (s *Server) NetmapSnapshot(ctx context.Context, req *netmapGRPC.NetmapSnapshotRequest) (*netmapGRPC.NetmapSnapshotResponse, error) {
snapshotReq := new(netmap.SnapshotRequest)
if err := snapshotReq.FromGRPCMessage(req); err != nil {
return nil, err
}
resp, err := s.srv.Snapshot(ctx, snapshotReq)
if err != nil {
return nil, err
}
return resp.ToGRPCMessage().(*netmapGRPC.NetmapSnapshotResponse), nil
}

View file

@ -33,26 +33,6 @@ func (w *healthCheckResponseWrapper) FromGRPCMessage(m grpc.Message) error {
return nil
}
type netmapSnapshotResponseWrapper struct {
message.Message
m *NetmapSnapshotResponse
}
func (w *netmapSnapshotResponseWrapper) ToGRPCMessage() grpc.Message {
return w.m
}
func (w *netmapSnapshotResponseWrapper) FromGRPCMessage(m grpc.Message) error {
var ok bool
w.m, ok = m.(*NetmapSnapshotResponse)
if !ok {
return message.NewUnexpectedMessageType(m, w.m)
}
return nil
}
type setNetmapStatusResponseWrapper struct {
message.Message
m *SetNetmapStatusResponse

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -9,7 +9,6 @@ const serviceName = "control.ControlService"
const (
rpcHealthCheck = "HealthCheck"
rpcNetmapSnapshot = "NetmapSnapshot"
rpcSetNetmapStatus = "SetNetmapStatus"
rpcDropObjects = "DropObjects"
rpcListShards = "ListShards"
@ -42,28 +41,6 @@ func HealthCheck(
return wResp.m, nil
}
// NetmapSnapshot executes ControlService.NetmapSnapshot RPC.
func NetmapSnapshot(
cli *client.Client,
req *NetmapSnapshotRequest,
opts ...client.CallOption,
) (*NetmapSnapshotResponse, error) {
wResp := &netmapSnapshotResponseWrapper{
m: new(NetmapSnapshotResponse),
}
wReq := &requestWrapper{
m: req,
}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcNetmapSnapshot), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.m, nil
}
// SetNetmapStatus executes ControlService.SetNetmapStatus RPC.
func SetNetmapStatus(
cli *client.Client,

View file

@ -1,91 +0,0 @@
package control
import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
netmapAPI "github.com/nspcc-dev/neofs-sdk-go/netmap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// NetmapSnapshot reads network map snapshot from Netmap storage.
func (s *Server) NetmapSnapshot(ctx context.Context, req *control.NetmapSnapshotRequest) (*control.NetmapSnapshotResponse, error) {
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
// get current epoch
epoch, err := s.netMapSrc.Epoch()
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
apiNetMap, err := s.netMapSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
nm := new(control.Netmap)
nm.SetEpoch(epoch)
nm.SetNodes(nodesFromAPI(apiNetMap.Nodes()))
// create and fill response
resp := new(control.NetmapSnapshotResponse)
body := new(control.NetmapSnapshotResponse_Body)
resp.SetBody(body)
body.SetNetmap(nm)
// sign the response
if err := SignMessage(s.key, resp); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func nodesFromAPI(apiNodes []netmapAPI.NodeInfo) []*control.NodeInfo {
nodes := make([]*control.NodeInfo, 0, len(apiNodes))
for i := range apiNodes {
node := new(control.NodeInfo)
node.SetPublicKey(apiNodes[i].PublicKey())
addrs := make([]string, 0, apiNodes[i].NumberOfNetworkEndpoints())
netmapAPI.IterateNetworkEndpoints(apiNodes[i], func(s string) {
addrs = append(addrs, s)
})
node.SetAddresses(addrs)
node.SetAttributes(attributesFromAPI(apiNodes[i]))
switch {
default:
node.SetState(control.NetmapStatus_STATUS_UNDEFINED)
case apiNodes[i].IsOnline():
node.SetState(control.NetmapStatus_ONLINE)
case apiNodes[i].IsOffline():
node.SetState(control.NetmapStatus_OFFLINE)
}
nodes = append(nodes, node)
}
return nodes
}
func attributesFromAPI(apiNode netmapAPI.NodeInfo) []*control.NodeInfo_Attribute {
attrs := make([]*control.NodeInfo_Attribute, 0, apiNode.NumberOfAttributes())
apiNode.IterateAttributes(func(key, value string) {
a := new(control.NodeInfo_Attribute)
a.SetKey(key)
a.SetValue(value)
attrs = append(attrs, a)
})
return attrs
}

View file

@ -28,27 +28,6 @@ func (x *HealthCheckResponse) SetBody(v *HealthCheckResponse_Body) {
}
}
// SetBody sets get netmap snapshot request body.
func (x *NetmapSnapshotRequest) SetBody(v *NetmapSnapshotRequest_Body) {
if x != nil {
x.Body = v
}
}
// SetNetmap sets structure of the current network map.
func (x *NetmapSnapshotResponse_Body) SetNetmap(v *Netmap) {
if x != nil {
x.Netmap = v
}
}
// SetBody sets get netmap snapshot response body.
func (x *NetmapSnapshotResponse) SetBody(v *NetmapSnapshotResponse_Body) {
if x != nil {
x.Body = v
}
}
// SetStatus sets new storage node status in NeoFS network map.
func (x *SetNetmapStatusRequest_Body) SetStatus(v NetmapStatus) {
if x != nil {

Binary file not shown.

View file

@ -11,9 +11,6 @@ service ControlService {
// Performs health check of the storage node.
rpc HealthCheck (HealthCheckRequest) returns (HealthCheckResponse);
// Returns network map snapshot of the current NeoFS epoch.
rpc NetmapSnapshot (NetmapSnapshotRequest) returns (NetmapSnapshotResponse);
// Sets status of the storage node in NeoFS network map.
rpc SetNetmapStatus (SetNetmapStatusRequest) returns (SetNetmapStatusResponse);
@ -70,34 +67,6 @@ message HealthCheckResponse {
Signature signature = 2;
}
// Get netmap snapshot request.
message NetmapSnapshotRequest {
// Get netmap snapshot request body.
message Body {
}
// Body of get netmap snapshot request message.
Body body = 1;
// Body signature.
Signature signature = 2;
}
// Get netmap snapshot request.
message NetmapSnapshotResponse {
// Get netmap snapshot response body
message Body {
// Structure of the requested network map.
Netmap netmap = 1 [json_name = "netmap"];
}
// Body of get netmap snapshot response message.
Body body = 1;
// Body signature.
Signature signature = 2;
}
// Set netmap status request.
message SetNetmapStatusRequest {
// Set netmap status request body.

Binary file not shown.

Binary file not shown.

View file

@ -33,30 +33,6 @@ func equalHealthCheckResponseBodies(b1, b2 *control.HealthCheckResponse_Body) bo
b1.GetHealthStatus() == b2.GetHealthStatus()
}
func TestNetmapSnapshotResponse_Body_StableMarshal(t *testing.T) {
testStableMarshal(t,
generateNetmapSnapshotResponseBody(),
new(control.NetmapSnapshotResponse_Body),
func(m1, m2 protoMessage) bool {
return equalNetmapSnapshotResponseBodies(
m1.(*control.NetmapSnapshotResponse_Body),
m2.(*control.NetmapSnapshotResponse_Body),
)
},
)
}
func generateNetmapSnapshotResponseBody() *control.NetmapSnapshotResponse_Body {
body := new(control.NetmapSnapshotResponse_Body)
body.SetNetmap(generateNetmap())
return body
}
func equalNetmapSnapshotResponseBodies(b1, b2 *control.NetmapSnapshotResponse_Body) bool {
return equalNetmaps(b1.GetNetmap(), b2.GetNetmap())
}
func TestSetNetmapStatusRequest_Body_StableMarshal(t *testing.T) {
testStableMarshal(t,
generateSetNetmapStatusRequestBody(),

Binary file not shown.

View file

@ -26,6 +26,10 @@ type NodeState interface {
// Must return current node state
// in NeoFS API v2 NodeInfo structure.
LocalNodeInfo() (*netmap.NodeInfo, error)
// ProcessCurrentNetMap passes current local network map of the storage node
// into the given handler.
ProcessCurrentNetMap(func(netmapSDK.NetMap))
}
// NetworkInfo encapsulates source of the
@ -123,3 +127,19 @@ func (s *executorSvc) NetworkInfo(
return resp, nil
}
func (s *executorSvc) Snapshot(_ context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
var nmV2 netmap.NetMap
s.state.ProcessCurrentNetMap(func(netMap netmapSDK.NetMap) {
netMap.WriteToV2(&nmV2)
})
body := new(netmap.SnapshotResponseBody)
body.SetNetMap(&nmV2)
resp := new(netmap.SnapshotResponse)
resp.SetBody(body)
return resp, nil
}

View file

@ -48,3 +48,16 @@ func (s *responseService) NetworkInfo(ctx context.Context, req *netmap.NetworkIn
return resp.(*netmap.NetworkInfoResponse), nil
}
func (s *responseService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.SnapshotResponse), nil
}

View file

@ -10,4 +10,5 @@ import (
type Server interface {
LocalNodeInfo(context.Context, *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error)
NetworkInfo(context.Context, *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error)
Snapshot(context.Context, *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error)
}

View file

@ -54,3 +54,19 @@ func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRe
return resp.(*netmap.NetworkInfoResponse), nil
}
func (s *signService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.Snapshot(ctx, req.(*netmap.SnapshotRequest))
},
func() util.ResponseMessage {
return new(netmap.SnapshotResponse)
},
)
if err != nil {
return nil, err
}
return resp.(*netmap.SnapshotResponse), nil
}

Binary file not shown.

Binary file not shown.

Binary file not shown.