WIP: netmap: remove dead nodes automatically #31

Closed
fyrchik wants to merge 3 commits from fyrchik/frostfs-contract:fix-netmap-cleaner into master
2 changed files with 129 additions and 50 deletions

View file

@ -59,11 +59,14 @@ const (
balanceContractKey = "balanceScriptHash"
cleanupEpochMethod = "newEpoch"
ConfigCleanupDepthKey = "CleanupDepth"
)
var (
configPrefix = []byte("config")
candidatePrefix = []byte("candidate")
lastAlivePrefix = []byte("l")
)
// _deploy function sets up initial list of inner ring public keys.
@ -142,7 +145,7 @@ func AddPeerIR(nodeInfo []byte) {
common.CheckAlphabetWitness()
publicKey := nodeInfo[2:35] // V2 format: offset:2, len:33
publicKey := pubkeyFromNodeInfo(nodeInfo)
addToNetmap(ctx, publicKey, Node{
BLOB: nodeInfo,
@ -150,13 +153,16 @@ func AddPeerIR(nodeInfo []byte) {
})
}
func pubkeyFromNodeInfo(nodeInfo []byte) []byte {
return nodeInfo[2:35] // V2 format: offset:2, len:33
}
// AddPeer accepts information about the network map candidate in the FrostFS
// binary protocol format and does nothing. Keep method because storage node
// creates a notary transaction with this method, which produces a notary
// notification (implicit here).
func AddPeer(nodeInfo []byte) {
// V2 format - offset:2, len:33
common.CheckWitness(nodeInfo[2:35])
common.CheckWitness(pubkeyFromNodeInfo(nodeInfo))
return
}
@ -251,7 +257,7 @@ func NewEpoch(epochNum int) {
panic("invalid epoch") // ignore invocations with invalid epoch
}
dataOnlineState := filterNetmap(ctx)
dataOnlineState := filterNetmap(ctx, epochNum)
runtime.Log("process new epoch")
@ -488,12 +494,18 @@ func addToNetmap(ctx storage.Context, publicKey []byte, node Node) {
storageKey := append(candidatePrefix, publicKey...)
storage.Put(ctx, storageKey, std.Serialize(node))
storageKey = append(lastAlivePrefix, publicKey...)
storage.Put(ctx, storageKey, storage.Get(ctx, snapshotEpoch))
runtime.Notify("AddPeerSuccess", interop.PublicKey(publicKey))
}
func removeFromNetmap(ctx storage.Context, key interop.PublicKey) {
storageKey := append(candidatePrefix, key...)
storage.Delete(ctx, storageKey)
storageKey = append(lastAlivePrefix, key...)
storage.Delete(ctx, storageKey)
}
func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeState) {
@ -507,17 +519,42 @@ func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeSta
storage.Put(ctx, storageKey, std.Serialize(node))
}
func filterNetmap(ctx storage.Context) []Node {
// filterNetmap filters candidates and creates netmap for the new epoch.
func filterNetmap(ctx storage.Context, newEpoch int) []Node {
var (
netmap = getNetmapNodes(ctx)
result = []Node{}
)
cleanupDepth := -1
value := getConfig(ctx, ConfigCleanupDepthKey)
if value != nil {
cleanupDepth = value.(int)
}
for i := 0; i < len(netmap); i++ {
item := netmap[i]
pub := pubkeyFromNodeInfo(item.BLOB)
if item.State != NodeStateOffline {
result = append(result, item)
if cleanupDepth < 0 || item.State == NodeStateMaintenance {
result = append(result, item)
continue
}
key := append(lastAlivePrefix, pub...)
lastSeenEpoch := storage.Get(ctx, key).(int)
// Sanity check:
// Cleanup depth = 3 means that we are allowed to exist for 3 epochs without bootstrap.
// Let last seen epoch be 7 (the node sent bootstrap and appeared in the netmap for 8 epoch).
// The condition is true if newEpoch = 8, 9, 10, 11, so epochs 9-11 are spent without bootstrap.
Review

Spent some time to verify, but it looks correct to me.

Spent some time to verify, but it looks correct to me.
if newEpoch-cleanupDepth <= lastSeenEpoch+1 {
result = append(result, item)
continue
}
}
removeFromNetmap(ctx, pub)
Review

Can you elaborate why it is required to remove pub key now? Cleanup depth affects append operation as expected, so it is a bit unexpected.

Can you elaborate why it is required to remove pub key now? Cleanup depth affects `append` operation as expected, so it is a bit unexpected.
}
return result

View file

@ -116,49 +116,53 @@ func TestNewEpoch(t *testing.T) {
cNm := newNetmapInvoker(t)
nodes := make([][]testNodeInfo, epochCount)
for i := range nodes {
size := rand.Int()%5 + 1
arr := make([]testNodeInfo, size)
for j := 0; j < size; j++ {
arr[j] = newStorageNode(t, cNm)
}
nodes[i] = arr
}
for i := 0; i < epochCount; i++ {
for _, tn := range nodes[i] {
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
}
if i > 0 {
// Remove random nodes from the previous netmap.
current := make([]testNodeInfo, 0, len(nodes[i])+len(nodes[i-1]))
current = append(current, nodes[i]...)
for j := range nodes[i-1] {
if rand.Int()%3 == 0 {
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
} else {
if rand.Int()%3 != 0 {
current = append(current, nodes[i-1][j])
}
}
nodes[i] = current
}
size := rand.Int()%5 + 1
for j := 0; j < size; j++ {
nodes[i] = append(nodes[i], newStorageNode(t, cNm))
}
}
for i := 0; i < epochCount; i++ {
seen := make(map[string]bool)
for _, tn := range nodes[i] {
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
seen[string(tn.pub)] = true
}
if i > 0 {
// Remove random nodes from the previous netmap.
for j := range nodes[i-1] {
if !seen[string(nodes[i-1][j].pub)] {
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
}
}
}
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
t.Logf("Epoch: %d, Netmap()", i)
s, err := cNm.TestInvoke(t, "netmap")
require.NoError(t, err)
require.Equal(t, 1, s.Len())
checkSnapshot(t, s, nodes[i])
checkSnapshotCurrent(t, cNm, nodes[i])
for j := 0; j <= i && j < netmap.DefaultSnapshotCount; j++ {
t.Logf("Epoch: %d, diff: %d", i, j)
checkSnapshotAt(t, j, cNm, nodes[i-j])
}
_, err = cNm.TestInvoke(t, "snapshot", netmap.DefaultSnapshotCount)
_, err := cNm.TestInvoke(t, "snapshot", netmap.DefaultSnapshotCount)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "incorrect diff"))
@ -168,6 +172,47 @@ func TestNewEpoch(t *testing.T) {
}
}
func TestNewEpochCleanupNodes(t *testing.T) {
rand.Seed(42)
const cleanupDepth = 3
cNm := newNetmapInvoker(t, netmap.ConfigCleanupDepthKey, int64(cleanupDepth))
nodes := make([]testNodeInfo, 4)
for i := range nodes {
nodes[i] = newStorageNode(t, cNm)
}
addNodes := func(i int) {
for _, tn := range nodes[i:] {
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
}
}
addNodes(0)
cNm.Invoke(t, stackitem.Null{}, "newEpoch", 1)
checkSnapshotCurrent(t, cNm, nodes)
// The first and the second node stop sending bootstrap requests
// in epochs 2, 3, 4.
// The second node is also moved to maintenance.
cNm.Invoke(t, stackitem.Null{}, "updateStateIR", int64(netmap.NodeStateMaintenance), nodes[1].pub)
nodes[1].state = netmap.NodeStateMaintenance
for i := 2; i < 5; i++ {
addNodes(2)
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
checkSnapshotCurrent(t, cNm, nodes)
}
for i := 5; i < 8; i++ {
addNodes(2)
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
checkSnapshotCurrent(t, cNm, nodes[1:]) // The first node removed, the second is in the maintenance.
}
}
func TestUpdateSnapshotCount(t *testing.T) {
rand.Seed(42)
@ -181,7 +226,9 @@ func TestUpdateSnapshotCount(t *testing.T) {
for i := 1; i < len(nodes); i++ {
sn := newStorageNode(t, cNm)
nodes[i] = append(nodes[i-1], sn)
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", sn.raw)
for j := range nodes[i] {
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", nodes[i][j].raw)
}
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
}
return nodes
@ -198,17 +245,14 @@ func TestUpdateSnapshotCount(t *testing.T) {
const newCount = netmap.DefaultSnapshotCount + 3
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
s, err := cNm.TestInvoke(t, "netmap")
require.NoError(t, err)
require.Equal(t, 1, s.Len())
checkSnapshot(t, s, nodes[epochCount-1])
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
for i := 0; i < epochCount; i++ {
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
}
for i := epochCount; i < newCount; i++ {
checkSnapshotAt(t, i, cNm, nil)
}
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
require.Error(t, err)
})
t.Run("increase size, copy old snapshots", func(t *testing.T) {
@ -222,17 +266,14 @@ func TestUpdateSnapshotCount(t *testing.T) {
const newCount = netmap.DefaultSnapshotCount + 3
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
s, err := cNm.TestInvoke(t, "netmap")
require.NoError(t, err)
require.Equal(t, 1, s.Len())
checkSnapshot(t, s, nodes[epochCount-1])
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
for i := 0; i < newCount-3; i++ {
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
}
for i := newCount - 3; i < newCount; i++ {
checkSnapshotAt(t, i, cNm, nil)
}
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
require.Error(t, err)
})
t.Run("decrease size, small decrease", func(t *testing.T) {
@ -246,14 +287,11 @@ func TestUpdateSnapshotCount(t *testing.T) {
const newCount = netmap.DefaultSnapshotCount/2 + 2
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
s, err := cNm.TestInvoke(t, "netmap")
require.NoError(t, err)
require.Equal(t, 1, s.Len())
checkSnapshot(t, s, nodes[epochCount-1])
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
for i := 0; i < newCount; i++ {
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
}
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
require.Error(t, err)
})
t.Run("decrease size, big decrease", func(t *testing.T) {
@ -267,18 +305,22 @@ func TestUpdateSnapshotCount(t *testing.T) {
const newCount = netmap.DefaultSnapshotCount/2 - 2
cNm.Invoke(t, stackitem.Null{}, "updateSnapshotCount", newCount)
s, err := cNm.TestInvoke(t, "netmap")
require.NoError(t, err)
require.Equal(t, 1, s.Len())
checkSnapshot(t, s, nodes[epochCount-1])
checkSnapshotCurrent(t, cNm, nodes[epochCount-1])
for i := 0; i < newCount; i++ {
checkSnapshotAt(t, i, cNm, nodes[epochCount-i-1])
}
_, err = cNm.TestInvoke(t, "snapshot", int64(newCount))
_, err := cNm.TestInvoke(t, "snapshot", int64(newCount))
require.Error(t, err)
})
}
func checkSnapshotCurrent(t *testing.T, cNm *neotest.ContractInvoker, nodes []testNodeInfo) {
s, err := cNm.TestInvoke(t, "netmap")
require.NoError(t, err)
require.Equal(t, 1, s.Len())
checkSnapshot(t, s, nodes)
}
func checkSnapshotAt(t *testing.T, epoch int, cNm *neotest.ContractInvoker, nodes []testNodeInfo) {
s, err := cNm.TestInvoke(t, "snapshot", int64(epoch))
require.NoError(t, err)