from frostfs_testlib import reporter from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.testing.test_control import retry class IpTablesHelper: @staticmethod def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None: shell = node.host.get_shell() for port in ports: shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP") @staticmethod def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None: shell = node.host.get_shell() for ip in block_ip: shell.exec(f"iptables -A INPUT -s {ip} -j DROP") @staticmethod def restore_input_traffic_to_port(node: ClusterNode) -> None: shell = node.host.get_shell() ports = shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'").stdout.strip().split("\n") if ports[0] == "": return for port in ports: shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP") @staticmethod def restore_input_traffic_to_node(node: ClusterNode) -> None: shell = node.host.get_shell() unlock_ip = shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'").stdout.strip().split("\n") if unlock_ip[0] == "": return for ip in unlock_ip: shell.exec(f"iptables -D INPUT -s {ip} -j DROP") # TODO Move class to HOST class IfUpDownHelper: @reporter.step("Down {interface} to {node}") def down_interface(self, node: ClusterNode, interface: str) -> None: shell = node.host.get_shell() shell.exec(f"ifdown {interface}") @reporter.step("Up {interface} to {node}") def up_interface(self, node: ClusterNode, interface: str) -> None: shell = node.host.get_shell() shell.exec(f"ifup {interface}") @reporter.step("Up all interface to {node}") def up_all_interface(self, node: ClusterNode) -> None: shell = node.host.get_shell() interfaces = list(node.host.config.interfaces.keys()) shell.exec("ifup -av") for name_interface in interfaces: self.check_state_up(node, name_interface) @reporter.step("Down all interface to {node}") def down_all_interface(self, node: ClusterNode) -> None: shell = node.host.get_shell() interfaces = list(node.host.config.interfaces.keys()) shell.exec("ifdown -av") for name_interface in interfaces: self.check_state_down(node, name_interface) @reporter.step("Check {node} to {interface}") def check_state(self, node: ClusterNode, interface: str) -> str: shell = node.host.get_shell() return shell.exec(f"ip link show {interface} | sed -z 's/.*state \(.*\) mode .*/\\1/'").stdout.strip() @retry(max_attempts=5, sleep_interval=5, expected_result="UP") def check_state_up(self, node: ClusterNode, interface: str) -> str: return self.check_state(node=node, interface=interface) @retry(max_attempts=5, sleep_interval=5, expected_result="DOWN") def check_state_down(self, node: ClusterNode, interface: str) -> str: return self.check_state(node=node, interface=interface)