WIP: netmap: remove dead nodes automatically #31
2 changed files with 106 additions and 22 deletions
|
@ -59,11 +59,14 @@ const (
|
|||
balanceContractKey = "balanceScriptHash"
|
||||
|
||||
cleanupEpochMethod = "newEpoch"
|
||||
|
||||
ConfigCleanupDepthKey = "CleanupDepth"
|
||||
)
|
||||
|
||||
var (
|
||||
configPrefix = []byte("config")
|
||||
candidatePrefix = []byte("candidate")
|
||||
lastAlivePrefix = []byte("l")
|
||||
)
|
||||
|
||||
// _deploy function sets up initial list of inner ring public keys.
|
||||
|
@ -254,7 +257,7 @@ func NewEpoch(epochNum int) {
|
|||
panic("invalid epoch") // ignore invocations with invalid epoch
|
||||
}
|
||||
|
||||
dataOnlineState := filterNetmap(ctx)
|
||||
dataOnlineState := filterNetmap(ctx, epochNum)
|
||||
|
||||
runtime.Log("process new epoch")
|
||||
|
||||
|
@ -491,12 +494,18 @@ func addToNetmap(ctx storage.Context, publicKey []byte, node Node) {
|
|||
storageKey := append(candidatePrefix, publicKey...)
|
||||
storage.Put(ctx, storageKey, std.Serialize(node))
|
||||
|
||||
storageKey = append(lastAlivePrefix, publicKey...)
|
||||
storage.Put(ctx, storageKey, storage.Get(ctx, snapshotEpoch))
|
||||
|
||||
runtime.Notify("AddPeerSuccess", interop.PublicKey(publicKey))
|
||||
}
|
||||
|
||||
func removeFromNetmap(ctx storage.Context, key interop.PublicKey) {
|
||||
storageKey := append(candidatePrefix, key...)
|
||||
storage.Delete(ctx, storageKey)
|
||||
|
||||
storageKey = append(lastAlivePrefix, key...)
|
||||
storage.Delete(ctx, storageKey)
|
||||
}
|
||||
|
||||
func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeState) {
|
||||
|
@ -510,17 +519,42 @@ func updateNetmapState(ctx storage.Context, key interop.PublicKey, state NodeSta
|
|||
storage.Put(ctx, storageKey, std.Serialize(node))
|
||||
}
|
||||
|
||||
func filterNetmap(ctx storage.Context) []Node {
|
||||
// filterNetmap filters candidates and creates netmap for the new epoch.
|
||||
func filterNetmap(ctx storage.Context, newEpoch int) []Node {
|
||||
var (
|
||||
netmap = getNetmapNodes(ctx)
|
||||
result = []Node{}
|
||||
)
|
||||
|
||||
cleanupDepth := -1
|
||||
value := getConfig(ctx, ConfigCleanupDepthKey)
|
||||
if value != nil {
|
||||
cleanupDepth = value.(int)
|
||||
}
|
||||
|
||||
for i := 0; i < len(netmap); i++ {
|
||||
item := netmap[i]
|
||||
pub := pubkeyFromNodeInfo(item.BLOB)
|
||||
|
||||
if item.State != NodeStateOffline {
|
||||
result = append(result, item)
|
||||
if cleanupDepth < 0 || item.State == NodeStateMaintenance {
|
||||
result = append(result, item)
|
||||
continue
|
||||
}
|
||||
|
||||
key := append(lastAlivePrefix, pub...)
|
||||
lastSeenEpoch := storage.Get(ctx, key).(int)
|
||||
// Sanity check:
|
||||
// Cleanup depth = 3 means that we are allowed to exist for 3 epochs without bootstrap.
|
||||
// Let last seen epoch be 7 (the node sent bootstrap and appeared in the netmap for 8 epoch).
|
||||
// The condition is true if newEpoch = 8, 9, 10, 11, so epochs 9-11 are spent without bootstrap.
|
||||
|
||||
if newEpoch-cleanupDepth <= lastSeenEpoch+1 {
|
||||
result = append(result, item)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
removeFromNetmap(ctx, pub)
|
||||
alexvanin
commented
Can you elaborate why it is required to remove pub key now? Cleanup depth affects Can you elaborate why it is required to remove pub key now? Cleanup depth affects `append` operation as expected, so it is a bit unexpected.
|
||||
}
|
||||
|
||||
return result
|
||||
|
|
|
@ -116,35 +116,42 @@ func TestNewEpoch(t *testing.T) {
|
|||
cNm := newNetmapInvoker(t)
|
||||
nodes := make([][]testNodeInfo, epochCount)
|
||||
for i := range nodes {
|
||||
size := rand.Int()%5 + 1
|
||||
arr := make([]testNodeInfo, size)
|
||||
for j := 0; j < size; j++ {
|
||||
arr[j] = newStorageNode(t, cNm)
|
||||
}
|
||||
nodes[i] = arr
|
||||
}
|
||||
|
||||
for i := 0; i < epochCount; i++ {
|
||||
for _, tn := range nodes[i] {
|
||||
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
// Remove random nodes from the previous netmap.
|
||||
current := make([]testNodeInfo, 0, len(nodes[i])+len(nodes[i-1]))
|
||||
current = append(current, nodes[i]...)
|
||||
|
||||
for j := range nodes[i-1] {
|
||||
if rand.Int()%3 == 0 {
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
|
||||
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
|
||||
} else {
|
||||
if rand.Int()%3 != 0 {
|
||||
current = append(current, nodes[i-1][j])
|
||||
}
|
||||
}
|
||||
nodes[i] = current
|
||||
}
|
||||
|
||||
size := rand.Int()%5 + 1
|
||||
for j := 0; j < size; j++ {
|
||||
nodes[i] = append(nodes[i], newStorageNode(t, cNm))
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < epochCount; i++ {
|
||||
seen := make(map[string]bool)
|
||||
for _, tn := range nodes[i] {
|
||||
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||
seen[string(tn.pub)] = true
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
// Remove random nodes from the previous netmap.
|
||||
for j := range nodes[i-1] {
|
||||
if !seen[string(nodes[i-1][j].pub)] {
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateStateIR",
|
||||
int64(netmap.NodeStateOffline), nodes[i-1][j].pub)
|
||||
}
|
||||
}
|
||||
}
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
||||
|
||||
t.Logf("Epoch: %d, Netmap()", i)
|
||||
|
@ -165,6 +172,47 @@ func TestNewEpoch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNewEpochCleanupNodes(t *testing.T) {
|
||||
rand.Seed(42)
|
||||
|
||||
const cleanupDepth = 3
|
||||
|
||||
cNm := newNetmapInvoker(t, netmap.ConfigCleanupDepthKey, int64(cleanupDepth))
|
||||
nodes := make([]testNodeInfo, 4)
|
||||
for i := range nodes {
|
||||
nodes[i] = newStorageNode(t, cNm)
|
||||
}
|
||||
|
||||
addNodes := func(i int) {
|
||||
for _, tn := range nodes[i:] {
|
||||
cNm.WithSigners(tn.signer).Invoke(t, stackitem.Null{}, "addPeer", tn.raw)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", tn.raw)
|
||||
}
|
||||
}
|
||||
|
||||
addNodes(0)
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", 1)
|
||||
checkSnapshotCurrent(t, cNm, nodes)
|
||||
|
||||
// The first and the second node stop sending bootstrap requests
|
||||
// in epochs 2, 3, 4.
|
||||
// The second node is also moved to maintenance.
|
||||
cNm.Invoke(t, stackitem.Null{}, "updateStateIR", int64(netmap.NodeStateMaintenance), nodes[1].pub)
|
||||
nodes[1].state = netmap.NodeStateMaintenance
|
||||
|
||||
for i := 2; i < 5; i++ {
|
||||
addNodes(2)
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
|
||||
checkSnapshotCurrent(t, cNm, nodes)
|
||||
}
|
||||
|
||||
for i := 5; i < 8; i++ {
|
||||
addNodes(2)
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i)
|
||||
checkSnapshotCurrent(t, cNm, nodes[1:]) // The first node removed, the second is in the maintenance.
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateSnapshotCount(t *testing.T) {
|
||||
rand.Seed(42)
|
||||
|
||||
|
@ -178,7 +226,9 @@ func TestUpdateSnapshotCount(t *testing.T) {
|
|||
for i := 1; i < len(nodes); i++ {
|
||||
sn := newStorageNode(t, cNm)
|
||||
nodes[i] = append(nodes[i-1], sn)
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", sn.raw)
|
||||
for j := range nodes[i] {
|
||||
cNm.Invoke(t, stackitem.Null{}, "addPeerIR", nodes[i][j].raw)
|
||||
}
|
||||
cNm.Invoke(t, stackitem.Null{}, "newEpoch", i+1)
|
||||
}
|
||||
return nodes
|
||||
|
|
Loading…
Reference in a new issue
Spent some time to verify, but it looks correct to me.