forked from TrueCloudLab/frostfs-node
[#938] ir/netmap: Call AddPeer method if existing candidate was updated
In previous implementation IR handler of `AddPeer` notification didn't send registration to contract if existing peer changed has changed its information. as a consequence, the network map members could not update the information without going into offline. Change `processAddPeer` handler to check if * candidate in the network map is a brand new * or information about the network map member was changed and call `AddPeer` method if so. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
0ec8f529ab
commit
68565d9617
3 changed files with 66 additions and 34 deletions
|
@ -1,7 +1,9 @@
|
||||||
package netmap
|
package netmap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
|
@ -12,13 +14,19 @@ type (
|
||||||
*sync.RWMutex
|
*sync.RWMutex
|
||||||
enabled bool
|
enabled bool
|
||||||
threshold uint64
|
threshold uint64
|
||||||
lastAccess map[string]epochStamp
|
lastAccess map[string]epochStampWithNodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
epochStamp struct {
|
epochStamp struct {
|
||||||
epoch uint64
|
epoch uint64
|
||||||
removeFlag bool
|
removeFlag bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
epochStampWithNodeInfo struct {
|
||||||
|
epochStamp
|
||||||
|
|
||||||
|
binNodeInfo []byte
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func newCleanupTable(enabled bool, threshold uint64) cleanupTable {
|
func newCleanupTable(enabled bool, threshold uint64) cleanupTable {
|
||||||
|
@ -26,7 +34,7 @@ func newCleanupTable(enabled bool, threshold uint64) cleanupTable {
|
||||||
RWMutex: new(sync.RWMutex),
|
RWMutex: new(sync.RWMutex),
|
||||||
enabled: enabled,
|
enabled: enabled,
|
||||||
threshold: threshold,
|
threshold: threshold,
|
||||||
lastAccess: make(map[string]epochStamp),
|
lastAccess: make(map[string]epochStampWithNodeInfo),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,33 +44,51 @@ func (c *cleanupTable) update(snapshot *netmap.Netmap, now uint64) {
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
// replacing map is less memory efficient but faster
|
// replacing map is less memory efficient but faster
|
||||||
newMap := make(map[string]epochStamp, len(snapshot.Nodes))
|
newMap := make(map[string]epochStampWithNodeInfo, len(snapshot.Nodes))
|
||||||
|
|
||||||
for i := range snapshot.Nodes {
|
for i := range snapshot.Nodes {
|
||||||
keyString := hex.EncodeToString(snapshot.Nodes[i].PublicKey())
|
binNodeInfo, err := snapshot.Nodes[i].Marshal()
|
||||||
if access, ok := c.lastAccess[keyString]; ok {
|
if err != nil {
|
||||||
access.removeFlag = false // reset remove Flag on each Update
|
panic(fmt.Errorf("could not marshal node info: %w", err)) // seems better than ignore
|
||||||
newMap[keyString] = access
|
|
||||||
} else {
|
|
||||||
newMap[keyString] = epochStamp{epoch: now}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
keyString := hex.EncodeToString(snapshot.Nodes[i].PublicKey())
|
||||||
|
|
||||||
|
access, ok := c.lastAccess[keyString]
|
||||||
|
if ok {
|
||||||
|
access.removeFlag = false // reset remove Flag on each Update
|
||||||
|
} else {
|
||||||
|
access.epoch = now
|
||||||
|
}
|
||||||
|
|
||||||
|
access.binNodeInfo = binNodeInfo
|
||||||
|
|
||||||
|
newMap[keyString] = access
|
||||||
}
|
}
|
||||||
|
|
||||||
c.lastAccess = newMap
|
c.lastAccess = newMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cleanupTable) touch(keyString string, now uint64) bool {
|
// updates last access time of the netmap node by string public key.
|
||||||
|
//
|
||||||
|
// Returns true if at least one condition is met:
|
||||||
|
// * node hasn't been accessed yet;
|
||||||
|
// * remove flag is set;
|
||||||
|
// * binary node info has changed.
|
||||||
|
func (c *cleanupTable) touch(keyString string, now uint64, binNodeInfo []byte) bool {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
access, ok := c.lastAccess[keyString]
|
access, ok := c.lastAccess[keyString]
|
||||||
result := !access.removeFlag && ok
|
result := !ok || access.removeFlag || !bytes.Equal(access.binNodeInfo, binNodeInfo)
|
||||||
|
|
||||||
access.removeFlag = false // reset remove flag on each touch
|
access.removeFlag = false // reset remove flag on each touch
|
||||||
if now > access.epoch {
|
if now > access.epoch {
|
||||||
access.epoch = now
|
access.epoch = now
|
||||||
}
|
}
|
||||||
|
|
||||||
|
access.binNodeInfo = binNodeInfo // update binary node info
|
||||||
|
|
||||||
c.lastAccess[keyString] = access
|
c.lastAccess[keyString] = access
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
|
@ -25,10 +25,13 @@ func TestCleanupTable(t *testing.T) {
|
||||||
networkMap, err := netmap.NewNetmap(netmap.NodesFromInfo(infos))
|
networkMap, err := netmap.NewNetmap(netmap.NodesFromInfo(infos))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mapInfos := map[string]struct{}{
|
mapInfos := make(map[string][]byte)
|
||||||
hex.EncodeToString(infos[0].PublicKey()): {},
|
|
||||||
hex.EncodeToString(infos[1].PublicKey()): {},
|
for i := range infos {
|
||||||
hex.EncodeToString(infos[2].PublicKey()): {},
|
binNodeInfo, err := infos[i].Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
mapInfos[hex.EncodeToString(infos[i].PublicKey())] = binNodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("update", func(t *testing.T) {
|
t.Run("update", func(t *testing.T) {
|
||||||
|
@ -59,10 +62,15 @@ func TestCleanupTable(t *testing.T) {
|
||||||
c.update(networkMap, 1)
|
c.update(networkMap, 1)
|
||||||
|
|
||||||
key := hex.EncodeToString(infos[1].PublicKey())
|
key := hex.EncodeToString(infos[1].PublicKey())
|
||||||
require.True(t, c.touch(key, 11))
|
require.False(t, c.touch(key, 11, mapInfos[key]))
|
||||||
require.EqualValues(t, 11, c.lastAccess[key].epoch)
|
require.EqualValues(t, 11, c.lastAccess[key].epoch)
|
||||||
|
|
||||||
require.False(t, c.touch(key+"x", 12))
|
updNodeInfo := []byte("changed node info")
|
||||||
|
|
||||||
|
require.True(t, c.touch(key, 11, updNodeInfo))
|
||||||
|
require.EqualValues(t, 11, c.lastAccess[key].epoch)
|
||||||
|
|
||||||
|
require.True(t, c.touch(key+"x", 12, updNodeInfo))
|
||||||
require.EqualValues(t, 12, c.lastAccess[key+"x"].epoch)
|
require.EqualValues(t, 12, c.lastAccess[key+"x"].epoch)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -74,7 +82,7 @@ func TestCleanupTable(t *testing.T) {
|
||||||
c.flag(key)
|
c.flag(key)
|
||||||
require.True(t, c.lastAccess[key].removeFlag)
|
require.True(t, c.lastAccess[key].removeFlag)
|
||||||
|
|
||||||
require.False(t, c.touch(key, 2))
|
require.True(t, c.touch(key, 2, mapInfos[key]))
|
||||||
require.False(t, c.lastAccess[key].removeFlag)
|
require.False(t, c.lastAccess[key].removeFlag)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -108,7 +116,7 @@ func TestCleanupTable(t *testing.T) {
|
||||||
cnt := 0
|
cnt := 0
|
||||||
key := hex.EncodeToString(infos[1].PublicKey())
|
key := hex.EncodeToString(infos[1].PublicKey())
|
||||||
|
|
||||||
require.False(t, c.touch(key, 4)) // one node was updated
|
require.True(t, c.touch(key, 4, mapInfos[key])) // one node was updated
|
||||||
|
|
||||||
require.NoError(t,
|
require.NoError(t,
|
||||||
c.forEachRemoveCandidate(4, func(s string) error {
|
c.forEachRemoveCandidate(4, func(s string) error {
|
||||||
|
|
|
@ -50,19 +50,8 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
||||||
})
|
})
|
||||||
nodeInfo.SetAttributes(a...)
|
nodeInfo.SetAttributes(a...)
|
||||||
|
|
||||||
keyString := hex.EncodeToString(nodeInfo.PublicKey())
|
// marshal updated node info structure
|
||||||
|
nodeInfoBinary, err := nodeInfo.Marshal()
|
||||||
exists := np.netmapSnapshot.touch(keyString, np.epochState.EpochCounter())
|
|
||||||
if !exists {
|
|
||||||
np.log.Info("approving network map candidate",
|
|
||||||
zap.String("key", keyString))
|
|
||||||
|
|
||||||
if nr := ev.NotaryRequest(); nr != nil {
|
|
||||||
// notary event case
|
|
||||||
|
|
||||||
var nodeInfoBinary []byte
|
|
||||||
|
|
||||||
nodeInfoBinary, err = nodeInfo.Marshal()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn("could not marshal updated network map candidate",
|
np.log.Warn("could not marshal updated network map candidate",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -71,6 +60,15 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
keyString := hex.EncodeToString(nodeInfo.PublicKey())
|
||||||
|
|
||||||
|
updated := np.netmapSnapshot.touch(keyString, np.epochState.EpochCounter(), nodeInfoBinary)
|
||||||
|
|
||||||
|
if updated {
|
||||||
|
np.log.Info("approving network map candidate",
|
||||||
|
zap.String("key", keyString))
|
||||||
|
|
||||||
|
if nr := ev.NotaryRequest(); nr != nil {
|
||||||
// create new notary request with the original nonce
|
// create new notary request with the original nonce
|
||||||
err = np.netmapClient.Morph().NotaryInvoke(
|
err = np.netmapClient.Morph().NotaryInvoke(
|
||||||
np.netmapClient.ContractAddress(),
|
np.netmapClient.ContractAddress(),
|
||||||
|
|
Loading…
Reference in a new issue