[#328] Change logic activating split-brain
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
parent
24e1dfef28
commit
3dc7a5bdb0
2 changed files with 38 additions and 21 deletions
|
@ -4,16 +4,18 @@ from frostfs_testlib.storage.cluster import ClusterNode
|
|||
|
||||
class IpHelper:
|
||||
@staticmethod
|
||||
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
|
||||
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[tuple]) -> None:
|
||||
shell = node.host.get_shell()
|
||||
for ip in block_ip:
|
||||
shell.exec(f"ip route add blackhole {ip}")
|
||||
for ip, table in block_ip:
|
||||
if not table:
|
||||
shell.exec(f"ip r a blackhole {ip}")
|
||||
continue
|
||||
shell.exec(f"ip r a blackhole {ip} table {table}")
|
||||
|
||||
@staticmethod
|
||||
def restore_input_traffic_to_node(node: ClusterNode) -> None:
|
||||
shell = node.host.get_shell()
|
||||
unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False))
|
||||
if unlock_ip.return_code != 0:
|
||||
return
|
||||
for ip in unlock_ip.stdout.strip().split("\n"):
|
||||
shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")
|
||||
unlock_ip = shell.exec("ip r l table all | grep blackhole", CommandOptions(check=False)).stdout
|
||||
|
||||
for active_blackhole in unlock_ip.strip().split("\n"):
|
||||
shell.exec(f"ip r d {active_blackhole}")
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import datetime
|
||||
import itertools
|
||||
import logging
|
||||
import time
|
||||
from typing import TypeVar
|
||||
|
@ -39,7 +40,7 @@ class ClusterStateController:
|
|||
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
|
||||
self.stopped_nodes: list[ClusterNode] = []
|
||||
self.detached_disks: dict[str, DiskController] = {}
|
||||
self.dropped_traffic: list[ClusterNode] = []
|
||||
self.dropped_traffic: set[ClusterNode] = set()
|
||||
self.excluded_from_netmap: list[StorageNode] = []
|
||||
self.stopped_services: set[NodeBase] = set()
|
||||
self.cluster = cluster
|
||||
|
@ -325,22 +326,22 @@ class ClusterStateController:
|
|||
|
||||
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
|
||||
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
|
||||
list_ip = self._parse_interfaces(block_nodes, name_interface)
|
||||
IpHelper.drop_input_traffic_to_node(node, list_ip)
|
||||
interfaces_tables = self._parse_interfaces(block_nodes, name_interface)
|
||||
IpHelper.drop_input_traffic_to_node(node, interfaces_tables)
|
||||
time.sleep(wakeup_timeout)
|
||||
self.dropped_traffic.append(node)
|
||||
self.dropped_traffic.add(node)
|
||||
|
||||
@reporter.step("Start traffic to {node}")
|
||||
def restore_traffic(self, node: ClusterNode) -> None:
|
||||
IpHelper.restore_input_traffic_to_node(node=node)
|
||||
index = self.dropped_traffic.index(node)
|
||||
self.dropped_traffic.pop(index)
|
||||
self.dropped_traffic.discard(node)
|
||||
|
||||
@reporter.step("Restore blocked nodes")
|
||||
def restore_all_traffic(self):
|
||||
if not self.dropped_traffic:
|
||||
return
|
||||
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
||||
self.dropped_traffic.clear()
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Hard reboot host {node} via magic SysRq option")
|
||||
|
@ -516,17 +517,31 @@ class ClusterStateController:
|
|||
|
||||
return disk_controller
|
||||
|
||||
@reporter.step("Restore traffic {node}")
|
||||
def _restore_traffic_to_node(self, node):
|
||||
IpHelper.restore_input_traffic_to_node(node)
|
||||
|
||||
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str):
|
||||
interfaces = []
|
||||
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str) -> list[tuple]:
|
||||
interfaces_and_tables = set()
|
||||
for node in nodes:
|
||||
dict_interfaces = node.host.config.interfaces
|
||||
for type, ip in dict_interfaces.items():
|
||||
if name_interface in type:
|
||||
interfaces.append(ip)
|
||||
return interfaces
|
||||
shell = node.host.get_shell()
|
||||
lines = shell.exec(f"ip r l table all | grep '{name_interface}'").stdout.splitlines()
|
||||
|
||||
ips = []
|
||||
tables = []
|
||||
|
||||
for line in lines:
|
||||
if "src" not in line or "table local" in line:
|
||||
continue
|
||||
parts = line.split()
|
||||
ips.append(parts[-1])
|
||||
if "table" in line:
|
||||
tables.append(parts[parts.index("table") + 1])
|
||||
tables.append(None)
|
||||
|
||||
[interfaces_and_tables.add((ip, table)) for ip, table in itertools.product(ips, tables)]
|
||||
|
||||
return interfaces_and_tables
|
||||
|
||||
@reporter.step("Ping node")
|
||||
def _ping_host(self, node: ClusterNode):
|
||||
|
|
Loading…
Reference in a new issue