WIP: netmap: remove dead nodes automatically #31
2 changed files with 129 additions and 50 deletions
|
@ -59,11 +59,14 @@ const (
|
|||
balanceContractKey = "balanceScriptHash"
|
||||
|
||||
cleanupEpochMethod = "newEpoch"
|
||||
|
||||
ConfigCleanupDepthKey = "CleanupDepth"
|
||||
)
|
||||
|
||||
var (
|
||||
configPrefix = []byte("config")
|
||||
candidatePrefix = []byte("candidate")
|
||||
lastAlivePrefix = []byte("l")
|
||||
)
|
||||
|
||||
// _deploy function sets up initial list of inner ring public keys.
|
||||
|
@ -142,7 +145,7 @@ func AddPeerIR(nodeInfo []byte) {
|
|||
|
||||
common.CheckAlphabetWitness()
|
||||
|
||||
publicKey := nodeInfo[2:35] // V2 format: offset:2, len:33
|
||||
publicKey := pubkeyFromNodeInfo(nodeInfo)
|
||||
|
||||
addToNetmap(ctx, publicKey, Node{
|
||||
BLOB: nodeInfo,
|
||||
|
@ -150,13 +153,16 @@ func AddPeerIR(nodeInfo []byte) {
|
|||
})
|
||||
}
|
||||
|
||||
func pubkeyFromNodeInfo(nodeInfo []byte) []byte {
|
||||
return nodeInfo[2:35] // V2 format: offset:2, len:33
|
||||
}
|
||||
|
||||
// AddPeer accepts information about the network map candidate in the FrostFS
|
||||
// binary protocol format and does nothing. Keep method because storage node
|
||||
// creates a notary transaction with this method, which produces a notary
|
||||
// notification (implicit here).
|
||||
func AddPeer(nodeInfo []byte) {
|
||||
// V2 format - offset:2, len:33
|
||||
common.CheckWitness(nodeInfo[2:35])
|
||||
common.CheckWitness(pubkeyFromNodeInfo(nodeInfo))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -251,7 +257,7 @@ func NewEpoch(epochNum int) {
|
|||
panic("invalid epoch") // ignore invocations with invalid epoch
|
||||
}
|
||||
|
||||
dataOnlineState := filterNetmap(ctx)
|
||||
dataOnlineState := filterNetmap(ctx, epochNum)
|
||||
|
||||
runtime.Log("process new epoch")
|
||||
|
||||
|
@ -488,12 +494,18 @@ func addToNetmap(ctx storage.Context, publicKey []byte, node Node) {
|
|||
storageKey := append(candidatePrefix, publicKey...)
|
||||
storage.Put(ctx, storageKey, std.Serialize(node))
|
||||
|
||||
storageKey = append(lastAlivePrefix, publicKey...)
|
||||
storage.Put(ctx, storageKey, storage.Get(ctx, snapshotEpoch))
|
||||
|
||||
runtime.Notify("AddPeerSuccess", interop.PublicKey(publicKey))
|
||||
}
|
||||
|
||||
func removeFromNetmap(ctx storage.Context, key interop.PublicKey) {
|
||||
storageKey := append(candidatePrefix, key...)
|
||||
storage.Delete(ctx, storageKey)
|
||||
|
||||
storageKey = append(lastAlivePrefix, key...)
|
||||
storage.Delete(ctx, storageKey)
|
||||
}
|
||||
|
||||
func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeState) {
|
||||
|
@ -507,17 +519,42 @@ func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeSta
|
|||
storage.Put(ctx, storageKey, std.Serialize(node))
|
||||
}
|
||||
|
||||
func filterNetmap(ctx storage.Context) []Node {
|
||||
// filterNetmap filters candidates and creates netmap for the new epoch.
|
||||
func filterNetmap(ctx storage.Context, newEpoch int) []Node {
|
||||
var (
|
||||
netmap = getNetmapNodes(ctx)
|
||||
result = []Node{}
|
||||
)
|
||||
|
||||
cleanupDepth := -1
|
||||
value := getConfig(ctx, ConfigCleanupDepthKey)
|
||||
if value != nil {
|
||||
cleanupDepth = value.(int)
|
||||
}
|
||||
|
||||
for i := 0; i < len(netmap); i++ {
|
||||
item := netmap[i]
|
||||
pub := pubkeyFromNodeInfo(item.BLOB)
|
||||
|
||||
if item.State != NodeStateOffline {
|
||||
result = append(result, item)
|
||||
if cleanupDepth < 0 || item.State == NodeStateMaintenance {
|
||||
result = append(result, item)
|
||||
continue
|
||||
}
|
||||
|
||||
key := append(lastAlivePrefix, pub...)
|
||||
lastSeenEpoch := storage.Get(ctx, key).(int)
|
||||
// Sanity check:
|
||||
// Cleanup depth = 3 means that we are allowed to exist for 3 epochs without bootstrap.
|
||||
// Let last seen epoch be 7 (the node sent bootstrap and appeared in the netmap for 8 epoch).
|
||||
// The condition is true if newEpoch = 8, 9, 10, 11, so epochs 9-11 are spent without bootstrap.
|
||||
|
||||
if newEpoch-cleanupDepth <= lastSeenEpoch+1 {
|
||||
result = append(result, item)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
removeFromNetmap(ctx, pub)
|
||||
alexvanin
commented
Can you elaborate why it is required to remove pub key now? Cleanup depth affects Can you elaborate why it is required to remove pub key now? Cleanup depth affects `append` operation as expected, so it is a bit unexpected.
|
||||
}
|
||||
|
||||
return result
|
||||
|
|
|
@ -116,49 +116,53 @@ func TestNewEpoch(t *testing.T) {
|
|||
cNm := newNetmapInvoker(t)
|
||||
nodes := make([][]testNodeInfo, epochCount)
|
||||
for i := range nodes {
|
||||
size := rand.Int()%5 + 1
|
||||
arr := make([]testNodeInfo, size)
|
||||
for j := 0; j < size; j++ {
|
||||
arr[j] = newStorageNode(t, cNm)
|
||||
}
|
||||
nodes[i] = arr
|
||||
}
|
||||
|
||||
for i := 0; i < epochCount; i++ {
|
||||
for _, tn := range nodes[i] {
|
||||
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
// Remove random nodes from the previous netmap.
|
||||
current := make([]testNodeInfo, 0, len(nodes[i])+len(nodes[i-1]))
|
||||
current = append(current, nodes[i]...)
|
||||
|
||||
for j := range nodes[i-1] {
|
||||
if rand.Int()%3 == 0 {
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
|
||||
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
|
||||
} else {
|
||||
if rand.Int()%3 != 0 {
|
||||
current = append(current, nodes[i-1][j])
|
||||
}
|
||||
}
|
||||
nodes[i] = current
|
||||
}
|
||||
|
||||
size := rand.Int()%5 + 1
|
||||
for j := 0; j < size; j++ {
|
||||
nodes[i] = append(nodes[i], newStorageNode(t, cNm))
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < epochCount; i++ {
|
||||
seen := make(map[string]bool)
|
||||
for _, tn := range nodes[i] {
|
||||
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||
seen[string(tn.pub)] = true
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
// Remove random nodes from the previous netmap.
|
||||
for j := range nodes[i-1] {
|
||||
if !seen[string(nodes[i-1][j].pub)] {
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
|
||||
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
|
||||
}
|
||||
}
|
||||
}
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
||||
|
||||
t.Logf("Epoch: %d, Netmap()", i)
|
||||
s, err := cNm.TestInvoke(t, "netmap")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, s.Len())
|
||||
checkSnapshot(t, s, nodes[i])
|
||||
checkSnapshotCurrent(t, cNm, nodes[i])
|
||||
|
||||
for j := 0; j <= i && j < netmap.DefaultSnapshotCount; j++ {
|
||||
t.Logf("Epoch: %d, diff: %d", i, j)
|
||||
checkSnapshotAt(t, j, cNm, nodes[i-j])
|
||||
}
|
||||
|
||||
_, err = cNm.TestInvoke(t, "snapshot", netmap.DefaultSnapshotCount)
|
||||
_, err := cNm.TestInvoke(t, "snapshot", netmap.DefaultSnapshotCount)
|
||||
require.Error(t, err)
|
||||
require.True(t, strings.Contains(err.Error(), "incorrect diff"))
|
||||
|
||||
|
@ -168,6 +172,47 @@ func TestNewEpoch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNewEpochCleanupNodes(t *testing.T) {
|
||||
rand.Seed(42)
|
||||
|
||||
const cleanupDepth = 3
|
||||
|
||||
cNm := newNetmapInvoker(t, netmap.ConfigCleanupDepthKey, int64(cleanupDepth))
|
||||
nodes := make([]testNodeInfo, 4)
|
||||
for i := range nodes {
|
||||
nodes[i] = newStorageNode(t, cNm)
|
||||
}
|
||||
|
||||
addNodes := func(i int) {
|
||||
for _, tn := range nodes[i:] {
|
||||
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||
}
|
||||
}
|
||||
|
||||
addNodes(0)
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", 1)
|
||||
checkSnapshotCurrent(t, cNm, nodes)
|
||||
|
||||
// The first and the second node stop sending bootstrap requests
|
||||
// in epochs 2, 3, 4.
|
||||
// The second node is also moved to maintenance.
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateStateIR", int64(netmap.NodeStateMaintenance), nodes[1].pub)
|
||||
nodes[1].state = netmap.NodeStateMaintenance
|
||||
|
||||
for i := 2; i < 5; i++ {
|
||||
addNodes(2)
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
|
||||
checkSnapshotCurrent(t, cNm, nodes)
|
||||
}
|
||||
|
||||
for i := 5; i < 8; i++ {
|
||||
addNodes(2)
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
|
||||
checkSnapshotCurrent(t, cNm, nodes[1:]) // The first node removed, the second is in the maintenance.
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateSnapshotCount(t *testing.T) {
|
||||
rand.Seed(42)
|
||||
|
||||
|
@ -181,7 +226,9 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
|||
for i := 1; i < len(nodes); i++ {
|
||||
sn := newStorageNode(t, cNm)
|
||||
nodes[i] = append(nodes[i-1], sn)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", sn.raw)
|
||||
for j := range nodes[i] {
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", nodes[i][j].raw)
|
||||
}
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
||||
}
|
||||
return nodes
|
||||
|
@ -198,17 +245,14 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
|||
const newCount = netmap.DefaultSnapshotCount + 3
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||
|
||||
s, err := cNm.TestInvoke(t, "netmap")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, s.Len())
|
||||
checkSnapshot(t, s, nodes[epochCount-1])
|
||||
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||
for i := 0; i < epochCount; i++ {
|
||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||
}
|
||||
for i := epochCount; i < newCount; i++ {
|
||||
checkSnapshotAt(t, i, cNm, nil)
|
||||
}
|
||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
require.Error(t, err)
|
||||
})
|
||||
t.Run("increase size, copy old snapshots", func(t *testing.T) {
|
||||
|
@ -222,17 +266,14 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
|||
const newCount = netmap.DefaultSnapshotCount + 3
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||
|
||||
s, err := cNm.TestInvoke(t, "netmap")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, s.Len())
|
||||
checkSnapshot(t, s, nodes[epochCount-1])
|
||||
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||
for i := 0; i < newCount-3; i++ {
|
||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||
}
|
||||
for i := newCount - 3; i < newCount; i++ {
|
||||
checkSnapshotAt(t, i, cNm, nil)
|
||||
}
|
||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
require.Error(t, err)
|
||||
})
|
||||
t.Run("decrease size, small decrease", func(t *testing.T) {
|
||||
|
@ -246,14 +287,11 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
|||
const newCount = netmap.DefaultSnapshotCount/2 + 2
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||
|
||||
s, err := cNm.TestInvoke(t, "netmap")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, s.Len())
|
||||
checkSnapshot(t, s, nodes[epochCount-1])
|
||||
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||
for i := 0; i < newCount; i++ {
|
||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||
}
|
||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
require.Error(t, err)
|
||||
})
|
||||
t.Run("decrease size, big decrease", func(t *testing.T) {
|
||||
|
@ -267,18 +305,22 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
|||
const newCount = netmap.DefaultSnapshotCount/2 - 2
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||
|
||||
s, err := cNm.TestInvoke(t, "netmap")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, s.Len())
|
||||
checkSnapshot(t, s, nodes[epochCount-1])
|
||||
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||
for i := 0; i < newCount; i++ {
|
||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||
}
|
||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func checkSnapshotCurrent(t *testing.T, cNm *neotest.ContractInvoker, nodes []testNodeInfo) {
|
||||
s, err := cNm.TestInvoke(t, "netmap")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, s.Len())
|
||||
checkSnapshot(t, s, nodes)
|
||||
}
|
||||
|
||||
func checkSnapshotAt(t *testing.T, epoch int, cNm *neotest.ContractInvoker, nodes []testNodeInfo) {
|
||||
s, err := cNm.TestInvoke(t, "snapshot", int64(epoch))
|
||||
require.NoError(t, err)
|
||||
|
|
Loading…
Reference in a new issue
Spent some time to verify, but it looks correct to me.