[#1513] Upgrade NeoFS SDK Go with changed netmap package

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-06-09 02:18:26 +03:00 committed by LeL
parent 24b4c1ecf4
commit 21d2f8f861
70 changed files with 878 additions and 992 deletions

View file

@ -1,69 +1,14 @@
package main
import (
"fmt"
"strconv"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
"github.com/nspcc-dev/neofs-node/pkg/util/attributes"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
)
const (
// list of default values for well-known attributes
defaultCapacity = 0
defaultPrice = 0
)
func parseAttributes(c *config.Config) []netmap.NodeAttribute {
if nodeconfig.Relay(c) {
return nil
func parseAttributes(c *cfg) {
if nodeconfig.Relay(c.appCfg) {
return
}
stringAttributes := nodeconfig.Attributes(c)
attrs, err := attributes.ParseV2Attributes(stringAttributes, nil)
if err != nil {
fatalOnErr(err)
}
return addWellKnownAttributes(attrs)
}
type wellKnownNodeAttrDesc struct {
explicit bool
defaultVal string
}
func listWellKnownAttrDesc() map[string]wellKnownNodeAttrDesc {
return map[string]wellKnownNodeAttrDesc{
netmap.AttrPrice: {defaultVal: strconv.FormatUint(defaultPrice, 10)},
netmap.AttrCapacity: {defaultVal: strconv.FormatUint(defaultCapacity, 10)},
netmap.AttrUNLOCODE: {explicit: true},
}
}
func addWellKnownAttributes(attrs []netmap.NodeAttribute) []netmap.NodeAttribute {
mWellKnown := listWellKnownAttrDesc()
// check how user defined well-known attributes
for i := range attrs {
delete(mWellKnown, attrs[i].Key())
}
for key, desc := range mWellKnown {
// check if required attribute is set
if desc.explicit {
fatalOnErr(fmt.Errorf("missing explicit value of required node attribute %s", key))
}
// set default value of the attribute
index := len(attrs)
attrs = append(attrs, netmap.NodeAttribute{})
attrs[index].SetKey(key)
attrs[index].SetValue(desc.defaultVal)
}
return attrs
fatalOnErr(attributes.ReadNodeAttributes(&c.cfgNodeInfo.localInfo, nodeconfig.Attributes(c.appCfg)))
}

View file

@ -213,21 +213,21 @@ func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
}
}
func (s *lruNetmapSource) GetNetMap(diff uint64) (*netmapSDK.Netmap, error) {
func (s *lruNetmapSource) GetNetMap(diff uint64) (*netmapSDK.NetMap, error) {
return s.getNetMapByEpoch(s.netState.CurrentEpoch() - diff)
}
func (s *lruNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmapSDK.Netmap, error) {
func (s *lruNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) {
return s.getNetMapByEpoch(epoch)
}
func (s *lruNetmapSource) getNetMapByEpoch(epoch uint64) (*netmapSDK.Netmap, error) {
func (s *lruNetmapSource) getNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) {
val, err := s.cache.get(epoch)
if err != nil {
return nil, err
}
return val.(*netmapSDK.Netmap), nil
return val.(*netmapSDK.NetMap), nil
}
func (s *lruNetmapSource) Epoch() (uint64, error) {

View file

@ -480,15 +480,21 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
}
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
ni := c.cfgNetmap.state.getNodeInfo()
if ni != nil {
return ni.ToV2(), nil
var res netmapV2.NodeInfo
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
ni.WriteToV2(&res)
} else {
c.cfgNodeInfo.localInfo.WriteToV2(&res)
}
return c.cfgNodeInfo.localInfo.ToV2(), nil
return &res, nil
}
// handleLocalNodeInfo rewrites local node info from netmap
// handleLocalNodeInfo rewrites local node info from the NeoFS network map.
// Called with nil when storage node is outside the NeoFS network map
// (before entering the network and after leaving it).
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
c.cfgNetmap.state.setNodeInfo(ni)
}
@ -496,10 +502,10 @@ func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
// bootstrap sets local node's netmap status to "online".
func (c *cfg) bootstrap() error {
ni := c.cfgNodeInfo.localInfo
ni.SetState(netmap.NodeStateOnline)
ni.SetOnline()
prm := nmClient.AddPeerPrm{}
prm.SetNodeInfo(&ni)
prm.SetNodeInfo(ni)
return c.cfgNetmap.wrapper.AddPeer(prm)
}

View file

@ -329,7 +329,7 @@ type loadPlacementBuilder struct {
cnrSrc containerCore.Source
}
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([]netmap.Nodes, error) {
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) {
cnrNodes, nm, err := l.buildPlacement(epoch, cnr)
if err != nil {
return nil, err
@ -341,7 +341,7 @@ func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([]netma
pivotPrefix + strconv.FormatUint(epoch, 10),
)
placement, err := nm.GetPlacementVectors(cnrNodes, pivot)
placement, err := nm.PlacementVectors(cnrNodes, pivot)
if err != nil {
return nil, fmt.Errorf("could not build placement vectors: %w", err)
}
@ -349,12 +349,17 @@ func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([]netma
return placement, nil
}
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) (netmap.ContainerNodes, *netmap.Netmap, error) {
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) {
cnr, err := l.cnrSrc.Get(idCnr)
if err != nil {
return nil, nil, err
}
policy := cnr.PlacementPolicy()
if policy == nil {
return nil, nil, errors.New("missing placement policy in container")
}
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("could not get network map: %w", err)
@ -363,7 +368,7 @@ func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) (netma
binCnr := make([]byte, sha256.Size)
idCnr.Encode(binCnr)
cnrNodes, err := nm.GetContainerNodes(cnr.PlacementPolicy(), binCnr)
cnrNodes, err := nm.ContainerNodes(*policy, binCnr)
if err != nil {
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
}
@ -512,9 +517,9 @@ func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID,
return false, err
}
for _, vector := range cnrNodes.Replicas() {
for _, node := range vector {
if bytes.Equal(node.PublicKey(), key) {
for i := range cnrNodes {
for j := range cnrNodes[i] {
if bytes.Equal(cnrNodes[i][j].PublicKey(), key) {
return true, nil
}
}

View file

@ -5,9 +5,7 @@ import (
"errors"
"fmt"
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
@ -20,6 +18,7 @@ import (
netmapService "github.com/nspcc-dev/neofs-node/pkg/services/netmap"
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
subnetid "github.com/nspcc-dev/neofs-sdk-go/subnet/id"
"github.com/nspcc-dev/neofs-sdk-go/version"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -53,16 +52,18 @@ func (s *networkState) setCurrentEpoch(v uint64) {
}
func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
s.nodeInfo.Store(ni)
ctrlNetSt := control.NetmapStatus_STATUS_UNDEFINED
var ctrlNetSt control.NetmapStatus
if ni != nil {
s.nodeInfo.Store(*ni)
switch ni.State() {
default:
ctrlNetSt = control.NetmapStatus_STATUS_UNDEFINED
case netmapSDK.NodeStateOnline:
ctrlNetSt = control.NetmapStatus_ONLINE
case netmapSDK.NodeStateOffline:
switch {
case ni.IsOnline():
ctrlNetSt = control.NetmapStatus_ONLINE
case ni.IsOffline():
ctrlNetSt = control.NetmapStatus_OFFLINE
}
} else {
ctrlNetSt = control.NetmapStatus_OFFLINE
}
@ -73,27 +74,48 @@ func (s *networkState) controlNetmapStatus() control.NetmapStatus {
return s.controlNetStatus.Load().(control.NetmapStatus)
}
func (s *networkState) getNodeInfo() *netmapSDK.NodeInfo {
return s.nodeInfo.Load().(*netmapSDK.NodeInfo)
func (s *networkState) getNodeInfo() (res netmapSDK.NodeInfo, ok bool) {
v := s.nodeInfo.Load()
if v != nil {
res, ok = v.(netmapSDK.NodeInfo)
if !ok {
panic(fmt.Sprintf("unexpected value in atomic node info state: %T", v))
}
}
return
}
func nodeKeyFromNetmap(c *cfg) []byte {
return c.cfgNetmap.state.getNodeInfo().PublicKey()
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
return ni.PublicKey()
}
return nil
}
func (c *cfg) iterateNetworkAddresses(f func(string) bool) {
c.cfgNetmap.state.getNodeInfo().IterateAddresses(f)
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
ni.IterateNetworkEndpoints(f)
}
}
func (c *cfg) addressNum() int {
return c.cfgNetmap.state.getNodeInfo().NumberOfAddresses()
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
return ni.NumberOfNetworkEndpoints()
}
return 0
}
func initNetmapService(c *cfg) {
network.WriteToNodeInfo(c.localAddr, &c.cfgNodeInfo.localInfo)
c.cfgNodeInfo.localInfo.SetPublicKey(c.key.PublicKey().Bytes())
c.cfgNodeInfo.localInfo.SetAttributes(parseAttributes(c.appCfg)...)
c.cfgNodeInfo.localInfo.SetState(netmapSDK.NodeStateOffline)
parseAttributes(c)
c.cfgNodeInfo.localInfo.SetOffline()
readSubnetCfg(c)
@ -111,10 +133,10 @@ func initNetmapService(c *cfg) {
c,
c.apiVersion,
&netInfo{
netState: c.cfgNetmap.state,
magic: c.cfgMorph.client,
netCfg: c.cfgNetmap.wrapper.IterateConfigParameters,
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
netState: c.cfgNetmap.state,
magic: c.cfgMorph.client,
morphClientNetMap: c.cfgNetmap.wrapper,
msPerBlockRdr: c.cfgMorph.client.MsPerBlock,
},
),
c.respSvc,
@ -236,14 +258,25 @@ func initNetmapState(c *cfg) {
ni, err := c.netmapLocalNodeState(epoch)
fatalOnErrDetails("could not init network state", err)
stateWord := "undefined"
if ni != nil {
switch {
case ni.IsOnline():
stateWord = "online"
case ni.IsOffline():
stateWord = "offline"
}
}
c.log.Info("initial network state",
zap.Uint64("epoch", epoch),
zap.Stringer("state", ni.State()),
zap.String("state", stateWord),
)
c.cfgNetmap.state.setCurrentEpoch(epoch)
c.cfgNetmap.startEpoch = epoch
c.cfgNetmap.state.setNodeInfo(ni)
c.handleLocalNodeInfo(ni)
}
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
@ -253,17 +286,14 @@ func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
return nil, err
}
return c.localNodeInfoFromNetmap(nm), nil
}
func (c *cfg) localNodeInfoFromNetmap(nm *netmapSDK.Netmap) *netmapSDK.NodeInfo {
for _, n := range nm.Nodes {
if bytes.Equal(n.PublicKey(), c.key.PublicKey().Bytes()) {
return n.NodeInfo
nmNodes := nm.Nodes()
for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), c.key.PublicKey().Bytes()) {
return &nmNodes[i], nil
}
}
return nil
return nil, nil
}
// addNewEpochNotificationHandler adds handler that will be executed synchronously
@ -309,18 +339,11 @@ func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
return c.bootstrap()
}
var apiState netmapSDK.NodeState
if st == control.NetmapStatus_OFFLINE {
apiState = netmapSDK.NodeStateOffline
}
c.cfgNetmap.reBoostrapTurnedOff.Store(true)
prm := nmClient.UpdatePeerPrm{}
prm.SetKey(c.key.PublicKey().Bytes())
prm.SetState(apiState)
return c.cfgNetmap.wrapper.UpdatePeerState(prm)
}
@ -332,50 +355,49 @@ type netInfo struct {
MagicNumber() (uint64, error)
}
netCfg func(func(key, value []byte) error) error
morphClientNetMap *nmClient.Client
msPerBlockRdr func() (int64, error)
}
func (n *netInfo) Dump(ver *refs.Version) (*netmapV2.NetworkInfo, error) {
func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) {
magic, err := n.magic.MagicNumber()
if err != nil {
return nil, err
}
ni := new(netmapV2.NetworkInfo)
var ni netmapSDK.NetworkInfo
ni.SetCurrentEpoch(n.netState.CurrentEpoch())
ni.SetMagicNumber(magic)
if mjr := ver.GetMajor(); mjr > 2 || mjr == 2 && ver.GetMinor() > 9 {
netInfoMorph, err := n.morphClientNetMap.ReadNetworkConfiguration()
if err != nil {
return nil, fmt.Errorf("read network configuration using netmap contract client: %w", err)
}
if mjr := ver.Major(); mjr > 2 || mjr == 2 && ver.Minor() > 9 {
msPerBlock, err := n.msPerBlockRdr()
if err != nil {
return nil, fmt.Errorf("ms per block: %w", err)
}
var (
ps []netmapV2.NetworkParameter
netCfg netmapV2.NetworkConfig
)
if err := n.netCfg(func(key, value []byte) error {
var p netmapV2.NetworkParameter
p.SetKey(key)
p.SetValue(value)
ps = append(ps, p)
return nil
}); err != nil {
return nil, fmt.Errorf("network config: %w", err)
}
netCfg.SetParameters(ps...)
ni.SetNetworkConfig(&netCfg)
ni.SetMsPerBlock(msPerBlock)
ni.SetMaxObjectSize(netInfoMorph.MaxObjectSize)
ni.SetStoragePrice(netInfoMorph.StoragePrice)
ni.SetAuditFee(netInfoMorph.AuditFee)
ni.SetEpochDuration(netInfoMorph.EpochDuration)
ni.SetContainerFee(netInfoMorph.ContainerFee)
ni.SetNamedContainerFee(netInfoMorph.ContainerAliasFee)
ni.SetNumberOfEigenTrustIterations(netInfoMorph.EigenTrustIterations)
ni.SetEigenTrustAlpha(netInfoMorph.EigenTrustAlpha)
ni.SetIRCandidateFee(netInfoMorph.IRCandidateFee)
ni.SetWithdrawalFee(netInfoMorph.WithdrawalFee)
for i := range netInfoMorph.Raw {
ni.SetRawNetworkParameter(netInfoMorph.Raw[i].Name, netInfoMorph.Raw[i].Value)
}
}
return ni, nil
return &ni, nil
}

View file

@ -530,10 +530,12 @@ func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient.
if err == nil {
key := info.PublicKey()
for i := range nm.Nodes {
if bytes.Equal(nm.Nodes[i].PublicKey(), key) {
nmNodes := nm.Nodes()
for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), key) {
prm := truststorage.UpdatePrm{}
prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))
prm.SetPeer(reputation.PeerIDFromBytes(nmNodes[i].PublicKey()))
return &reputationClient{
MultiAddressClient: cl.(coreclient.MultiAddressClient),

View file

@ -26,7 +26,7 @@ func (i InitialTrustSource) InitialTrust(reputation.PeerID) (reputation.TrustVal
return reputation.TrustZero, fmt.Errorf("failed to get NetMap: %w", err)
}
nodeCount := reputation.TrustValueFromFloat64(float64(len(nm.Nodes)))
nodeCount := reputation.TrustValueFromFloat64(float64(len(nm.Nodes())))
if nodeCount == 0 {
return reputation.TrustZero, ErrEmptyNetMap
}

View file

@ -66,14 +66,15 @@ func (it *TrustIterator) Iterate(h reputation.TrustHandler) error {
// find out if local node is presented in netmap
localIndex := -1
for i := range nm.Nodes {
if bytes.Equal(nm.Nodes[i].PublicKey(), it.storage.LocalKey) {
nmNodes := nm.Nodes()
for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), it.storage.LocalKey) {
localIndex = i
break
}
}
ln := len(nm.Nodes)
ln := len(nmNodes)
if localIndex >= 0 && ln > 0 {
ln--
}
@ -81,13 +82,13 @@ func (it *TrustIterator) Iterate(h reputation.TrustHandler) error {
// calculate Pj http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Chapter 4.5.
p := reputation.TrustOne.Div(reputation.TrustValueFromInt(ln))
for i := range nm.Nodes {
for i := range nmNodes {
if i == localIndex {
continue
}
trust := reputation.Trust{}
trust.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))
trust.SetPeer(reputation.PeerIDFromBytes(nmNodes[i].PublicKey()))
trust.SetValue(p)
trust.SetTrustingPeer(reputation.PeerIDFromBytes(it.storage.LocalKey))