[#1680] morph/netmap: Adopt to recent contract changes

After recent Netmap contract changes all read methods which return
network map (either candidates or snapshots) encode node descriptors
into same structure.

Decode `netmap.Node` contract-side structure from the call results.
Replace node state with the value from the `netmap.Node.State` field.

Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
This commit is contained in:
Leonard Lyubich 2022-09-28 15:34:28 +04:00 committed by fyrchik
parent d6c01199c8
commit eb1fba5182
14 changed files with 133 additions and 219 deletions

View file

@ -45,7 +45,7 @@ func removeNodesCmd(cmd *cobra.Command, args []string) error {
bw := io.NewBufBinWriter()
for i := range nodeKeys {
emit.AppCall(bw.BinWriter, nmHash, "updateStateIR", callflag.All,
int64(netmapcontract.OfflineState), nodeKeys[i].Bytes())
int64(netmapcontract.NodeStateOffline), nodeKeys[i].Bytes())
}
if err := emitNewEpochCall(bw, wCtx, nmHash); err != nil {

2
go.mod
View file

@ -18,7 +18,7 @@ require (
github.com/nspcc-dev/neo-go v0.99.2
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20220809123759-3094d3e0c14b // indirect
github.com/nspcc-dev/neofs-api-go/v2 v2.13.2-0.20220919124434-cf868188ef9c
github.com/nspcc-dev/neofs-contract v0.15.5
github.com/nspcc-dev/neofs-contract v0.15.5-0.20220930133158-d95bc535894c
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.6.0.20220926102839-c6576c8112ee
github.com/nspcc-dev/tzhash v1.6.1
github.com/panjf2000/ants/v2 v2.4.0

BIN
go.sum

Binary file not shown.

View file

@ -40,7 +40,7 @@ func (ap *Processor) processEmit() {
return
}
networkMap, err := ap.netmapClient.Snapshot()
networkMap, err := ap.netmapClient.NetMap()
if err != nil {
ap.log.Warn("can't get netmap snapshot to emit gas to storage nodes",
zap.String("error", err.Error()))

View file

@ -37,7 +37,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
}
// get new netmap snapshot
networkMap, err := np.netmapClient.Snapshot()
networkMap, err := np.netmapClient.NetMap()
if err != nil {
np.log.Warn("can't get netmap snapshot to perform cleanup",
zap.String("error", err.Error()))

View file

@ -169,14 +169,12 @@ func (np *Processor) processRemoveSubnetNode(ev subnetEvent.RemoveNode) {
return
}
candidateNodes := candidates.Nodes()
for i := range candidateNodes {
if !bytes.Equal(candidateNodes[i].PublicKey(), ev.Node()) {
for i := range candidates {
if !bytes.Equal(candidates[i].PublicKey(), ev.Node()) {
continue
}
err = candidateNodes[i].IterateSubnets(func(subNetID subnetid.ID) error {
err = candidates[i].IterateSubnets(func(subNetID subnetid.ID) error {
if subNetID.Equals(subnetToRemoveFrom) {
return netmap.ErrRemoveSubnet
}
@ -198,7 +196,7 @@ func (np *Processor) processRemoveSubnetNode(ev subnetEvent.RemoveNode) {
}
} else {
prm := netmapclient.AddPeerPrm{}
prm.SetNodeInfo(candidateNodes[i])
prm.SetNodeInfo(candidates[i])
prm.SetHash(ev.TxHash())
err = np.netmapClient.AddPeer(prm)

View file

@ -301,10 +301,8 @@ func (s *Server) handleSubnetRemoval(e event.Event) {
return
}
candidateNodes := candidates.Nodes()
for i := range candidateNodes {
s.processCandidate(delEv.TxHash(), removedID, candidateNodes[i])
for i := range candidates {
s.processCandidate(delEv.TxHash(), removedID, candidates[i])
}
}

View file

@ -4,19 +4,13 @@ import (
"fmt"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
netmapcontract "github.com/nspcc-dev/neofs-contract/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
)
const (
nodeInfoFixedPrmNumber = 1
peerWithStateFixedPrmNumber = 2
)
// GetNetMapByEpoch receives information list about storage nodes
// through the Netmap contract call, composes network map
// from them and returns it. Returns snapshot of the specified epoch number.
// GetNetMapByEpoch calls "snapshotByEpoch" method with the given epoch and
// decodes netmap.NetMap from the response.
func (c *Client) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) {
invokePrm := client.TestInvokePrm{}
invokePrm.SetMethod(epochSnapshotMethod)
@ -28,145 +22,126 @@ func (c *Client) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) {
epochSnapshotMethod, err)
}
nm, err := unmarshalNetmap(res, epochSnapshotMethod)
if err == nil {
nm.SetEpoch(epoch)
nm, err := decodeNetMap(res)
if err != nil {
return nil, err
}
nm.SetEpoch(epoch)
return nm, err
}
// GetCandidates receives information list about candidates
// for the next epoch network map through the Netmap contract
// call, composes network map from them and returns it.
func (c *Client) GetCandidates() (*netmap.NetMap, error) {
// GetCandidates calls "netmapCandidates" method and decodes []netmap.NodeInfo
// from the response.
func (c *Client) GetCandidates() ([]netmap.NodeInfo, error) {
invokePrm := client.TestInvokePrm{}
invokePrm.SetMethod(netMapCandidatesMethod)
prms, err := c.client.TestInvoke(invokePrm)
res, err := c.client.TestInvoke(invokePrm)
if err != nil {
return nil, fmt.Errorf("could not perform test invocation (%s): %w", netMapCandidatesMethod, err)
}
candVals, err := nodeInfosFromStackItems(prms, netMapCandidatesMethod)
if err != nil {
return nil, fmt.Errorf("could not parse contract response: %w", err)
if len(res) > 0 {
return decodeNodeList(res[0])
}
var nm netmap.NetMap
nm.SetNodes(candVals)
return &nm, nil
return nil, nil
}
// NetMap performs the test invoke of get network map
// method of NeoFS Netmap contract.
func (c *Client) NetMap() ([][]byte, error) {
// NetMap calls "netmap" method and decode netmap.NetMap from the response.
func (c *Client) NetMap() (*netmap.NetMap, error) {
invokePrm := client.TestInvokePrm{}
invokePrm.SetMethod(netMapMethod)
prms, err := c.client.TestInvoke(invokePrm)
res, err := c.client.TestInvoke(invokePrm)
if err != nil {
return nil, fmt.Errorf("could not perform test invocation (%s): %w",
netMapMethod, err)
}
return peersFromStackItems(prms, netMapMethod)
return decodeNetMap(res)
}
func nodeInfosFromStackItems(stack []stackitem.Item, method string) ([]netmap.NodeInfo, error) {
if ln := len(stack); ln != 1 {
return nil, fmt.Errorf("unexpected stack item count (%s): %d", method, ln)
}
func decodeNetMap(resStack []stackitem.Item) (*netmap.NetMap, error) {
var nm netmap.NetMap
netmapNodes, err := client.ArrayFromStackItem(stack[0])
if len(resStack) > 0 {
nodes, err := decodeNodeList(resStack[0])
if err != nil {
return nil, fmt.Errorf("could not get stack item array from stack item (%s): %w", method, err)
return nil, err
}
res := make([]netmap.NodeInfo, len(netmapNodes))
for i := range netmapNodes {
err := stackItemToNodeInfo(netmapNodes[i], &res[i])
if err != nil {
return nil, fmt.Errorf("could not parse stack item (Peer #%d): %w", i, err)
}
nm.SetNodes(nodes)
}
return res, nil
return &nm, nil
}
func stackItemToNodeInfo(prm stackitem.Item, res *netmap.NodeInfo) error {
prms, err := client.ArrayFromStackItem(prm)
func decodeNodeList(itemNodes stackitem.Item) ([]netmap.NodeInfo, error) {
itemArrNodes, err := client.ArrayFromStackItem(itemNodes)
if err != nil {
return fmt.Errorf("could not get stack item array (PeerWithState): %w", err)
} else if ln := len(prms); ln != peerWithStateFixedPrmNumber {
return fmt.Errorf(
"unexpected stack item count (PeerWithState): expected %d, has %d",
peerWithStateFixedPrmNumber,
ln,
)
return nil, fmt.Errorf("decode item array of nodes from the response item: %w", err)
}
peer, err := peerInfoFromStackItem(prms[0])
var nodes []netmap.NodeInfo
if len(itemArrNodes) > 0 {
nodes = make([]netmap.NodeInfo, len(itemArrNodes))
for i := range itemArrNodes {
err = decodeNodeInfo(&nodes[i], itemArrNodes[i])
if err != nil {
return fmt.Errorf("could not get bytes from 'node' field of PeerWithState: %w", err)
} else if err = res.Unmarshal(peer); err != nil {
return fmt.Errorf("can't unmarshal peer info: %w", err)
return nil, fmt.Errorf("decode node #%d: %w", i+1, err)
}
}
}
// state
state, err := client.IntFromStackItem(prms[1])
return nodes, nil
}
func decodeNodeInfo(dst *netmap.NodeInfo, itemNode stackitem.Item) error {
nodeFields, err := client.ArrayFromStackItem(itemNode)
if err != nil {
return fmt.Errorf("could not get int from 'state' field of PeerWithState: %w", err)
return fmt.Errorf("decode item array of node fields: %w", err)
}
switch state {
case 1:
res.SetOnline()
case 2:
res.SetOffline()
var node netmapcontract.Node
if len(nodeFields) > 0 {
node.BLOB, err = client.BytesFromStackItem(nodeFields[0])
if err != nil {
return fmt.Errorf("decode node info BLOB: %w", err)
}
}
node.State = netmapcontract.NodeStateOnline
if len(nodeFields) > 1 {
state, err := client.IntFromStackItem(nodeFields[1])
if err != nil {
return fmt.Errorf("decode integer from 2nd item: %w", err)
}
node.State = netmapcontract.NodeState(state)
}
err = dst.Unmarshal(node.BLOB)
if err != nil {
return fmt.Errorf("decode node info: %w", err)
}
switch node.State {
default:
return fmt.Errorf("unsupported state %v", node.State)
case netmapcontract.NodeStateOnline:
dst.SetOnline()
case netmapcontract.NodeStateOffline:
dst.SetOffline()
case netmapcontract.NodeStateMaintenance:
dst.SetMaintenance()
}
return nil
}
func peersFromStackItems(stack []stackitem.Item, method string) ([][]byte, error) {
if ln := len(stack); ln != 1 {
return nil, fmt.Errorf("unexpected stack item count (%s): %d",
method, ln)
}
peers, err := client.ArrayFromStackItem(stack[0])
if err != nil {
return nil, fmt.Errorf("could not get stack item array from stack item (%s): %w",
method, err)
}
res := make([][]byte, 0, len(peers))
for i := range peers {
peer, err := peerInfoFromStackItem(peers[i])
if err != nil {
return nil, fmt.Errorf("could not parse stack item (Peer #%d): %w", i, err)
}
res = append(res, peer)
}
return res, nil
}
func peerInfoFromStackItem(prm stackitem.Item) ([]byte, error) {
prms, err := client.ArrayFromStackItem(prm)
if err != nil {
return nil, fmt.Errorf("could not get stack item array (PeerInfo): %w", err)
} else if ln := len(prms); ln != nodeInfoFixedPrmNumber {
return nil, fmt.Errorf(
"unexpected stack item count (PeerInfo): expected %d, has %d",
nodeInfoFixedPrmNumber,
ln,
)
}
return client.BytesFromStackItem(prms[0])
}

View file

@ -19,10 +19,12 @@ func Test_stackItemsToNodeInfos(t *testing.T) {
rand.Read(pub)
switch i % 3 {
case int(netmapcontract.OfflineState):
default:
expected[i].SetOffline()
case int(netmapcontract.OnlineState):
case int(netmapcontract.NodeStateOnline):
expected[i].SetOnline()
case int(netmapcontract.NodeStateMaintenance):
expected[i].SetMaintenance()
}
expected[i].SetPublicKey(pub)
@ -38,20 +40,20 @@ func Test_stackItemsToNodeInfos(t *testing.T) {
switch {
case expected[i].IsOnline():
state = int64(netmapcontract.OnlineState)
state = int64(netmapcontract.NodeStateOnline)
case expected[i].IsOffline():
state = int64(netmapcontract.OfflineState)
state = int64(netmapcontract.NodeStateOffline)
case expected[i].IsMaintenance():
state = int64(netmapcontract.NodeStateMaintenance)
}
items[i] = stackitem.NewStruct([]stackitem.Item{
stackitem.NewStruct([]stackitem.Item{
stackitem.NewByteArray(data),
}),
stackitem.NewBigInteger(big.NewInt(state)),
})
}
actual, err := nodeInfosFromStackItems([]stackitem.Item{stackitem.NewArray(items)}, "")
actual, err := decodeNodeList(stackitem.NewArray(items))
require.NoError(t, err)
require.Equal(t, expected, actual)
}

View file

@ -1,28 +1,12 @@
package netmap
import (
"fmt"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
)
// GetNetMap receives information list about storage nodes
// through the Netmap contract call, composes network map
// from them and returns it. With diff == 0 returns current
// network map, else return snapshot of previous network map.
// GetNetMap calls "snapshot" method and decodes netmap.NetMap from the response.
func (c *Client) GetNetMap(diff uint64) (*netmap.NetMap, error) {
return c.getNetMap(diff)
}
// Snapshot returns current netmap node infos.
// Consider using pkg/morph/client/netmap for this.
func (c *Client) Snapshot() (*netmap.NetMap, error) {
return c.getNetMap(0)
}
func (c *Client) getNetMap(diff uint64) (*netmap.NetMap, error) {
prm := client.TestInvokePrm{}
prm.SetMethod(snapshotMethod)
prm.SetArgs(diff)
@ -32,24 +16,5 @@ func (c *Client) getNetMap(diff uint64) (*netmap.NetMap, error) {
return nil, err
}
return unmarshalNetmap(res, snapshotMethod)
}
func unmarshalNetmap(items []stackitem.Item, method string) (*netmap.NetMap, error) {
rawPeers, err := peersFromStackItems(items, method)
if err != nil {
return nil, err
}
result := make([]netmap.NodeInfo, len(rawPeers))
for i := range rawPeers {
if err := result[i].Unmarshal(rawPeers[i]); err != nil {
return nil, fmt.Errorf("can't unmarshal node info (%s): %w", method, err)
}
}
var nm netmap.NetMap
nm.SetNodes(result)
return &nm, nil
return decodeNetMap(res)
}

View file

@ -7,18 +7,11 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
)
// TODO: enum can become redundant after neofs-contract#270
const (
stateOffline int8 = iota
stateOnline
stateMaintenance
)
// UpdatePeerPrm groups parameters of UpdatePeerState operation.
type UpdatePeerPrm struct {
key []byte
state int8 // state enum value
state netmap.NodeState
client.InvokePrmOptional
}
@ -32,14 +25,14 @@ func (u *UpdatePeerPrm) SetKey(key []byte) {
//
// Zero UpdatePeerPrm marks node as "offline".
func (u *UpdatePeerPrm) SetOnline() {
u.state = stateOnline
u.state = netmap.NodeStateOnline
}
// SetMaintenance marks node to be switched into "maintenance" state.
//
// Zero UpdatePeerPrm marks node as "offline".
func (u *UpdatePeerPrm) SetMaintenance() {
u.state = stateMaintenance
u.state = netmap.NodeStateMaintenance
}
// UpdatePeerState changes peer status through Netmap contract call.
@ -53,22 +46,9 @@ func (c *Client) UpdatePeerState(p UpdatePeerPrm) error {
method += "IR"
}
state := netmap.OfflineState // pre-assign since type of value is unexported
switch p.state {
default:
panic(fmt.Sprintf("unexpected node's state value %v", p.state))
case stateOffline:
// already set above
case stateOnline:
state = netmap.OnlineState
case stateMaintenance:
state = netmap.OfflineState + 1 // FIXME: use named constant after neofs-contract#269
}
prm := client.InvokePrm{}
prm.SetMethod(method)
prm.SetArgs(int64(state), p.key)
prm.SetArgs(int64(p.state), p.key)
prm.InvokePrmOptional = p.InvokePrmOptional
if err := c.client.Invoke(prm); err != nil {

View file

@ -12,17 +12,10 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
)
// TODO: enum can become redundant after neofs-contract#270
const (
_ int8 = iota
stateOnline
stateMaintenance
)
type UpdatePeer struct {
publicKey *keys.PublicKey
state int8 // state enum value
state netmap.NodeState
// For notary notifications only.
// Contains raw transactions of notary request.
@ -35,13 +28,13 @@ func (UpdatePeer) MorphEvent() {}
// Online returns true if node's state is requested to be switched
// to "online".
func (s UpdatePeer) Online() bool {
return s.state == stateOnline
return s.state == netmap.NodeStateOnline
}
// Maintenance returns true if node's state is requested to be switched
// to "maintenance".
func (s UpdatePeer) Maintenance() bool {
return s.state == stateMaintenance
return s.state == netmap.NodeStateMaintenance
}
func (s UpdatePeer) PublicKey() *keys.PublicKey {
@ -54,6 +47,18 @@ func (s UpdatePeer) NotaryRequest() *payload.P2PNotaryRequest {
return s.notaryRequest
}
func (s *UpdatePeer) decodeState(state int64) error {
switch s.state = netmap.NodeState(state); s.state {
default:
return fmt.Errorf("unsupported node state %d", state)
case
netmap.NodeStateOffline,
netmap.NodeStateOnline,
netmap.NodeStateMaintenance:
return nil
}
}
const expectedItemNumUpdatePeer = 2
func ParseUpdatePeer(e *state.ContainedNotificationEvent) (event.Event, error) {
@ -88,14 +93,9 @@ func ParseUpdatePeer(e *state.ContainedNotificationEvent) (event.Event, error) {
return nil, fmt.Errorf("could not get node status: %w", err)
}
switch st {
default:
return nil, fmt.Errorf("unsupported node state %d", st)
case int64(netmap.OfflineState):
case int64(netmap.OnlineState):
ev.state = stateOnline
case int64(netmap.OfflineState) + 1: // FIXME: use named constant after neofs-contract#269
ev.state = stateMaintenance
err = ev.decodeState(st)
if err != nil {
return nil, err
}
return ev, nil

View file

@ -7,7 +7,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neofs-contract/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
)
@ -61,14 +60,9 @@ func ParseUpdatePeerNotary(ne event.NotaryEvent) (event.Event, error) {
return nil, err
}
switch state {
default:
return nil, fmt.Errorf("unsupported node state %d", err)
case int64(netmap.OfflineState):
case int64(netmap.OnlineState):
ev.state = stateOnline
case int64(netmap.OfflineState) + 1: // FIXME: use named constant after neofs-contract#269
ev.state = stateMaintenance
err = ev.decodeState(state)
if err != nil {
return nil, err
}
fieldNum++

View file

@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neofs-contract/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/stretchr/testify/require"
)
@ -43,15 +44,16 @@ func TestParseUpdatePeer(t *testing.T) {
})
t.Run("correct behavior", func(t *testing.T) {
const state = netmap.NodeStateMaintenance
ev, err := ParseUpdatePeer(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewBigInteger(new(big.Int).SetInt64(1)),
stackitem.NewBigInteger(big.NewInt(int64(state))),
stackitem.NewByteArray(publicKey.Bytes()),
}))
require.NoError(t, err)
require.Equal(t, UpdatePeer{
publicKey: publicKey,
state: stateOnline,
state: state,
}, ev)
})
}