[#328] Change logic activating split-brain
Some checks failed
DCO action / DCO (pull_request) Has been cancelled

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2024-11-28 16:43:46 +03:00
parent 0c9660fffc
commit 77a9f26916
2 changed files with 38 additions and 21 deletions

View file

@ -4,16 +4,18 @@ from frostfs_testlib.storage.cluster import ClusterNode
class IpHelper: class IpHelper:
@staticmethod @staticmethod
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None: def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[tuple]) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
for ip in block_ip: for ip, table in block_ip:
shell.exec(f"ip route add blackhole {ip}") if not table:
shell.exec(f"ip r a blackhole {ip}")
continue
shell.exec(f"ip r a blackhole {ip} table {table}")
@staticmethod @staticmethod
def restore_input_traffic_to_node(node: ClusterNode) -> None: def restore_input_traffic_to_node(node: ClusterNode) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False)) unlock_ip = shell.exec("ip r l table all | grep blackhole", CommandOptions(check=False)).stdout
if unlock_ip.return_code != 0:
return for active_blackhole in unlock_ip.strip().split("\n"):
for ip in unlock_ip.stdout.strip().split("\n"): shell.exec(f"ip r d {active_blackhole}")
shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")

View file

@ -1,4 +1,5 @@
import datetime import datetime
import itertools
import logging import logging
import time import time
from typing import TypeVar from typing import TypeVar
@ -39,7 +40,7 @@ class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None: def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: list[ClusterNode] = [] self.dropped_traffic: set[ClusterNode] = set()
self.excluded_from_netmap: list[StorageNode] = [] self.excluded_from_netmap: list[StorageNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
@ -325,22 +326,22 @@ class ClusterStateController:
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}") @reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None: def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
list_ip = self._parse_interfaces(block_nodes, name_interface) interfaces_tables = self._parse_interfaces(block_nodes, name_interface)
IpHelper.drop_input_traffic_to_node(node, list_ip) IpHelper.drop_input_traffic_to_node(node, interfaces_tables)
time.sleep(wakeup_timeout) time.sleep(wakeup_timeout)
self.dropped_traffic.append(node) self.dropped_traffic.add(node)
@reporter.step("Start traffic to {node}") @reporter.step("Start traffic to {node}")
def restore_traffic(self, node: ClusterNode) -> None: def restore_traffic(self, node: ClusterNode) -> None:
IpHelper.restore_input_traffic_to_node(node=node) IpHelper.restore_input_traffic_to_node(node=node)
index = self.dropped_traffic.index(node) self.dropped_traffic.discard(node)
self.dropped_traffic.pop(index)
@reporter.step("Restore blocked nodes") @reporter.step("Restore blocked nodes")
def restore_all_traffic(self): def restore_all_traffic(self):
if not self.dropped_traffic: if not self.dropped_traffic:
return return
parallel(self._restore_traffic_to_node, self.dropped_traffic) parallel(self._restore_traffic_to_node, self.dropped_traffic)
self.dropped_traffic.clear()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Hard reboot host {node} via magic SysRq option") @reporter.step("Hard reboot host {node} via magic SysRq option")
@ -516,17 +517,31 @@ class ClusterStateController:
return disk_controller return disk_controller
@reporter.step("Restore traffic {node}")
def _restore_traffic_to_node(self, node): def _restore_traffic_to_node(self, node):
IpHelper.restore_input_traffic_to_node(node) IpHelper.restore_input_traffic_to_node(node)
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str): def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str) -> list[tuple]:
interfaces = [] interfaces_and_tables = set()
for node in nodes: for node in nodes:
dict_interfaces = node.host.config.interfaces shell = node.host.get_shell()
for type, ip in dict_interfaces.items(): lines = shell.exec(f"ip r l table all | grep '{name_interface}'").stdout.splitlines()
if name_interface in type:
interfaces.append(ip) ips = []
return interfaces tables = []
for line in lines:
if "src" not in line or "table local" in line:
continue
parts = line.split()
ips.append(parts[-1])
if "table" in line:
tables.append(parts[parts.index("table") + 1])
tables.append(None)
[interfaces_and_tables.add((ip, table)) for ip, table in itertools.product(ips, tables)]
return interfaces_and_tables
@reporter.step("Ping node") @reporter.step("Ping node")
def _ping_host(self, node: ClusterNode): def _ping_host(self, node: ClusterNode):