WIP: netmap: remove dead nodes automatically #31
2 changed files with 129 additions and 50 deletions
|
@ -59,11 +59,14 @@ const (
|
||||||
balanceContractKey = "balanceScriptHash"
|
balanceContractKey = "balanceScriptHash"
|
||||||
|
|
||||||
cleanupEpochMethod = "newEpoch"
|
cleanupEpochMethod = "newEpoch"
|
||||||
|
|
||||||
|
ConfigCleanupDepthKey = "CleanupDepth"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
configPrefix = []byte("config")
|
configPrefix = []byte("config")
|
||||||
candidatePrefix = []byte("candidate")
|
candidatePrefix = []byte("candidate")
|
||||||
|
lastAlivePrefix = []byte("l")
|
||||||
)
|
)
|
||||||
|
|
||||||
// _deploy function sets up initial list of inner ring public keys.
|
// _deploy function sets up initial list of inner ring public keys.
|
||||||
|
@ -142,7 +145,7 @@ func AddPeerIR(nodeInfo []byte) {
|
||||||
|
|
||||||
common.CheckAlphabetWitness()
|
common.CheckAlphabetWitness()
|
||||||
|
|
||||||
publicKey := nodeInfo[2:35] // V2 format: offset:2, len:33
|
publicKey := pubkeyFromNodeInfo(nodeInfo)
|
||||||
|
|
||||||
addToNetmap(ctx, publicKey, Node{
|
addToNetmap(ctx, publicKey, Node{
|
||||||
BLOB: nodeInfo,
|
BLOB: nodeInfo,
|
||||||
|
@ -150,13 +153,16 @@ func AddPeerIR(nodeInfo []byte) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pubkeyFromNodeInfo(nodeInfo []byte) []byte {
|
||||||
|
return nodeInfo[2:35] // V2 format: offset:2, len:33
|
||||||
|
}
|
||||||
|
|
||||||
// AddPeer accepts information about the network map candidate in the FrostFS
|
// AddPeer accepts information about the network map candidate in the FrostFS
|
||||||
// binary protocol format and does nothing. Keep method because storage node
|
// binary protocol format and does nothing. Keep method because storage node
|
||||||
// creates a notary transaction with this method, which produces a notary
|
// creates a notary transaction with this method, which produces a notary
|
||||||
// notification (implicit here).
|
// notification (implicit here).
|
||||||
func AddPeer(nodeInfo []byte) {
|
func AddPeer(nodeInfo []byte) {
|
||||||
// V2 format - offset:2, len:33
|
common.CheckWitness(pubkeyFromNodeInfo(nodeInfo))
|
||||||
common.CheckWitness(nodeInfo[2:35])
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,7 +257,7 @@ func NewEpoch(epochNum int) {
|
||||||
panic("invalid epoch") // ignore invocations with invalid epoch
|
panic("invalid epoch") // ignore invocations with invalid epoch
|
||||||
}
|
}
|
||||||
|
|
||||||
dataOnlineState := filterNetmap(ctx)
|
dataOnlineState := filterNetmap(ctx, epochNum)
|
||||||
|
|
||||||
runtime.Log("process new epoch")
|
runtime.Log("process new epoch")
|
||||||
|
|
||||||
|
@ -488,12 +494,18 @@ func addToNetmap(ctx storage.Context, publicKey []byte, node Node) {
|
||||||
storageKey := append(candidatePrefix, publicKey...)
|
storageKey := append(candidatePrefix, publicKey...)
|
||||||
storage.Put(ctx, storageKey, std.Serialize(node))
|
storage.Put(ctx, storageKey, std.Serialize(node))
|
||||||
|
|
||||||
|
storageKey = append(lastAlivePrefix, publicKey...)
|
||||||
|
storage.Put(ctx, storageKey, storage.Get(ctx, snapshotEpoch))
|
||||||
|
|
||||||
runtime.Notify("AddPeerSuccess", interop.PublicKey(publicKey))
|
runtime.Notify("AddPeerSuccess", interop.PublicKey(publicKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeFromNetmap(ctx storage.Context, key interop.PublicKey) {
|
func removeFromNetmap(ctx storage.Context, key interop.PublicKey) {
|
||||||
storageKey := append(candidatePrefix, key...)
|
storageKey := append(candidatePrefix, key...)
|
||||||
storage.Delete(ctx, storageKey)
|
storage.Delete(ctx, storageKey)
|
||||||
|
|
||||||
|
storageKey = append(lastAlivePrefix, key...)
|
||||||
|
storage.Delete(ctx, storageKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeState) {
|
func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeState) {
|
||||||
|
@ -507,17 +519,42 @@ func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeSta
|
||||||
storage.Put(ctx, storageKey, std.Serialize(node))
|
storage.Put(ctx, storageKey, std.Serialize(node))
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterNetmap(ctx storage.Context) []Node {
|
// filterNetmap filters candidates and creates netmap for the new epoch.
|
||||||
|
func filterNetmap(ctx storage.Context, newEpoch int) []Node {
|
||||||
var (
|
var (
|
||||||
netmap = getNetmapNodes(ctx)
|
netmap = getNetmapNodes(ctx)
|
||||||
result = []Node{}
|
result = []Node{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
cleanupDepth := -1
|
||||||
|
value := getConfig(ctx, ConfigCleanupDepthKey)
|
||||||
|
if value != nil {
|
||||||
|
cleanupDepth = value.(int)
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; i < len(netmap); i++ {
|
for i := 0; i < len(netmap); i++ {
|
||||||
item := netmap[i]
|
item := netmap[i]
|
||||||
|
pub := pubkeyFromNodeInfo(item.BLOB)
|
||||||
|
|
||||||
if item.State != NodeStateOffline {
|
if item.State != NodeStateOffline {
|
||||||
result = append(result, item)
|
if cleanupDepth < 0 || item.State == NodeStateMaintenance {
|
||||||
|
result = append(result, item)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
key := append(lastAlivePrefix, pub...)
|
||||||
|
lastSeenEpoch := storage.Get(ctx, key).(int)
|
||||||
|
// Sanity check:
|
||||||
|
// Cleanup depth = 3 means that we are allowed to exist for 3 epochs without bootstrap.
|
||||||
|
// Let last seen epoch be 7 (the node sent bootstrap and appeared in the netmap for 8 epoch).
|
||||||
|
// The condition is true if newEpoch = 8, 9, 10, 11, so epochs 9-11 are spent without bootstrap.
|
||||||
|
|||||||
|
if newEpoch-cleanupDepth <= lastSeenEpoch+1 {
|
||||||
|
result = append(result, item)
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removeFromNetmap(ctx, pub)
|
||||||
alexvanin
commented
Can you elaborate why it is required to remove pub key now? Cleanup depth affects Can you elaborate why it is required to remove pub key now? Cleanup depth affects `append` operation as expected, so it is a bit unexpected.
|
|||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
|
@ -116,49 +116,53 @@ func TestNewEpoch(t *testing.T) {
|
||||||
cNm := newNetmapInvoker(t)
|
cNm := newNetmapInvoker(t)
|
||||||
nodes := make([][]testNodeInfo, epochCount)
|
nodes := make([][]testNodeInfo, epochCount)
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
size := rand.Int()%5 + 1
|
|
||||||
arr := make([]testNodeInfo, size)
|
|
||||||
for j := 0; j < size; j++ {
|
|
||||||
arr[j] = newStorageNode(t, cNm)
|
|
||||||
}
|
|
||||||
nodes[i] = arr
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < epochCount; i++ {
|
|
||||||
for _, tn := range nodes[i] {
|
|
||||||
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
|
||||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
|
||||||
}
|
|
||||||
|
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
// Remove random nodes from the previous netmap.
|
// Remove random nodes from the previous netmap.
|
||||||
current := make([]testNodeInfo, 0, len(nodes[i])+len(nodes[i-1]))
|
current := make([]testNodeInfo, 0, len(nodes[i])+len(nodes[i-1]))
|
||||||
current = append(current, nodes[i]...)
|
current = append(current, nodes[i]...)
|
||||||
|
|
||||||
for j := range nodes[i-1] {
|
for j := range nodes[i-1] {
|
||||||
if rand.Int()%3 == 0 {
|
if rand.Int()%3 != 0 {
|
||||||
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
|
|
||||||
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
|
|
||||||
} else {
|
|
||||||
current = append(current, nodes[i-1][j])
|
current = append(current, nodes[i-1][j])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nodes[i] = current
|
nodes[i] = current
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size := rand.Int()%5 + 1
|
||||||
|
for j := 0; j < size; j++ {
|
||||||
|
nodes[i] = append(nodes[i], newStorageNode(t, cNm))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < epochCount; i++ {
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
for _, tn := range nodes[i] {
|
||||||
|
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||||
|
seen[string(tn.pub)] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if i > 0 {
|
||||||
|
// Remove random nodes from the previous netmap.
|
||||||
|
for j := range nodes[i-1] {
|
||||||
|
if !seen[string(nodes[i-1][j].pub)] {
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
|
||||||
|
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
||||||
|
|
||||||
t.Logf("Epoch: %d, Netmap()", i)
|
t.Logf("Epoch: %d, Netmap()", i)
|
||||||
s, err := cNm.TestInvoke(t, "netmap")
|
checkSnapshotCurrent(t, cNm, nodes[i])
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, s.Len())
|
|
||||||
checkSnapshot(t, s, nodes[i])
|
|
||||||
|
|
||||||
for j := 0; j <= i && j < netmap.DefaultSnapshotCount; j++ {
|
for j := 0; j <= i && j < netmap.DefaultSnapshotCount; j++ {
|
||||||
t.Logf("Epoch: %d, diff: %d", i, j)
|
t.Logf("Epoch: %d, diff: %d", i, j)
|
||||||
checkSnapshotAt(t, j, cNm, nodes[i-j])
|
checkSnapshotAt(t, j, cNm, nodes[i-j])
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = cNm.TestInvoke(t, "snapshot", netmap.DefaultSnapshotCount)
|
_, err := cNm.TestInvoke(t, "snapshot", netmap.DefaultSnapshotCount)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.True(t, strings.Contains(err.Error(), "incorrect diff"))
|
require.True(t, strings.Contains(err.Error(), "incorrect diff"))
|
||||||
|
|
||||||
|
@ -168,6 +172,47 @@ func TestNewEpoch(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewEpochCleanupNodes(t *testing.T) {
|
||||||
|
rand.Seed(42)
|
||||||
|
|
||||||
|
const cleanupDepth = 3
|
||||||
|
|
||||||
|
cNm := newNetmapInvoker(t, netmap.ConfigCleanupDepthKey, int64(cleanupDepth))
|
||||||
|
nodes := make([]testNodeInfo, 4)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i] = newStorageNode(t, cNm)
|
||||||
|
}
|
||||||
|
|
||||||
|
addNodes := func(i int) {
|
||||||
|
for _, tn := range nodes[i:] {
|
||||||
|
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addNodes(0)
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "newEpoch", 1)
|
||||||
|
checkSnapshotCurrent(t, cNm, nodes)
|
||||||
|
|
||||||
|
// The first and the second node stop sending bootstrap requests
|
||||||
|
// in epochs 2, 3, 4.
|
||||||
|
// The second node is also moved to maintenance.
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "updateStateIR", int64(netmap.NodeStateMaintenance), nodes[1].pub)
|
||||||
|
nodes[1].state = netmap.NodeStateMaintenance
|
||||||
|
|
||||||
|
for i := 2; i < 5; i++ {
|
||||||
|
addNodes(2)
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
|
||||||
|
checkSnapshotCurrent(t, cNm, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 5; i < 8; i++ {
|
||||||
|
addNodes(2)
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
|
||||||
|
checkSnapshotCurrent(t, cNm, nodes[1:]) // The first node removed, the second is in the maintenance.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestUpdateSnapshotCount(t *testing.T) {
|
func TestUpdateSnapshotCount(t *testing.T) {
|
||||||
rand.Seed(42)
|
rand.Seed(42)
|
||||||
|
|
||||||
|
@ -181,7 +226,9 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
||||||
for i := 1; i < len(nodes); i++ {
|
for i := 1; i < len(nodes); i++ {
|
||||||
sn := newStorageNode(t, cNm)
|
sn := newStorageNode(t, cNm)
|
||||||
nodes[i] = append(nodes[i-1], sn)
|
nodes[i] = append(nodes[i-1], sn)
|
||||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", sn.raw)
|
for j := range nodes[i] {
|
||||||
|
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", nodes[i][j].raw)
|
||||||
|
}
|
||||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
||||||
}
|
}
|
||||||
return nodes
|
return nodes
|
||||||
|
@ -198,17 +245,14 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
||||||
const newCount = netmap.DefaultSnapshotCount + 3
|
const newCount = netmap.DefaultSnapshotCount + 3
|
||||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||||
|
|
||||||
s, err := cNm.TestInvoke(t, "netmap")
|
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, s.Len())
|
|
||||||
checkSnapshot(t, s, nodes[epochCount-1])
|
|
||||||
for i := 0; i < epochCount; i++ {
|
for i := 0; i < epochCount; i++ {
|
||||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||||
}
|
}
|
||||||
for i := epochCount; i < newCount; i++ {
|
for i := epochCount; i < newCount; i++ {
|
||||||
checkSnapshotAt(t, i, cNm, nil)
|
checkSnapshotAt(t, i, cNm, nil)
|
||||||
}
|
}
|
||||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
t.Run("increase size, copy old snapshots", func(t *testing.T) {
|
t.Run("increase size, copy old snapshots", func(t *testing.T) {
|
||||||
|
@ -222,17 +266,14 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
||||||
const newCount = netmap.DefaultSnapshotCount + 3
|
const newCount = netmap.DefaultSnapshotCount + 3
|
||||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||||
|
|
||||||
s, err := cNm.TestInvoke(t, "netmap")
|
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, s.Len())
|
|
||||||
checkSnapshot(t, s, nodes[epochCount-1])
|
|
||||||
for i := 0; i < newCount-3; i++ {
|
for i := 0; i < newCount-3; i++ {
|
||||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||||
}
|
}
|
||||||
for i := newCount - 3; i < newCount; i++ {
|
for i := newCount - 3; i < newCount; i++ {
|
||||||
checkSnapshotAt(t, i, cNm, nil)
|
checkSnapshotAt(t, i, cNm, nil)
|
||||||
}
|
}
|
||||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
t.Run("decrease size, small decrease", func(t *testing.T) {
|
t.Run("decrease size, small decrease", func(t *testing.T) {
|
||||||
|
@ -246,14 +287,11 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
||||||
const newCount = netmap.DefaultSnapshotCount/2 + 2
|
const newCount = netmap.DefaultSnapshotCount/2 + 2
|
||||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||||
|
|
||||||
s, err := cNm.TestInvoke(t, "netmap")
|
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, s.Len())
|
|
||||||
checkSnapshot(t, s, nodes[epochCount-1])
|
|
||||||
for i := 0; i < newCount; i++ {
|
for i := 0; i < newCount; i++ {
|
||||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||||
}
|
}
|
||||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
t.Run("decrease size, big decrease", func(t *testing.T) {
|
t.Run("decrease size, big decrease", func(t *testing.T) {
|
||||||
|
@ -267,18 +305,22 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
||||||
const newCount = netmap.DefaultSnapshotCount/2 - 2
|
const newCount = netmap.DefaultSnapshotCount/2 - 2
|
||||||
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
|
||||||
|
|
||||||
s, err := cNm.TestInvoke(t, "netmap")
|
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, s.Len())
|
|
||||||
checkSnapshot(t, s, nodes[epochCount-1])
|
|
||||||
for i := 0; i < newCount; i++ {
|
for i := 0; i < newCount; i++ {
|
||||||
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
|
||||||
}
|
}
|
||||||
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
|
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkSnapshotCurrent(t *testing.T, cNm *neotest.ContractInvoker, nodes []testNodeInfo) {
|
||||||
|
s, err := cNm.TestInvoke(t, "netmap")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, s.Len())
|
||||||
|
checkSnapshot(t, s, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
func checkSnapshotAt(t *testing.T, epoch int, cNm *neotest.ContractInvoker, nodes []testNodeInfo) {
|
func checkSnapshotAt(t *testing.T, epoch int, cNm *neotest.ContractInvoker, nodes []testNodeInfo) {
|
||||||
s, err := cNm.TestInvoke(t, "snapshot", int64(epoch))
|
s, err := cNm.TestInvoke(t, "snapshot", int64(epoch))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
Loading…
Reference in a new issue
Spent some time to verify, but it looks correct to me.