from frostfs_testlib.shell import Shell from frostfs_testlib.storage.cluster import ClusterNode class IpTablesHelper: @staticmethod def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None: shell = node.host.get_shell() for port in ports: shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP") @staticmethod def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None: shell = node.host.get_shell() for ip in block_ip: shell.exec(f"iptables -A INPUT -s {ip} -j DROP") @staticmethod def restore_input_traffic_to_port(node: ClusterNode) -> None: shell = node.host.get_shell() ports = ( shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'") .stdout.strip() .split("\n") ) if ports[0] == "": return for port in ports: shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP") @staticmethod def restore_input_traffic_to_node(node: ClusterNode) -> None: shell = node.host.get_shell() unlock_ip = ( shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'") .stdout.strip() .split("\n") ) if unlock_ip[0] == "": return for ip in unlock_ip: shell.exec(f"iptables -D INPUT -s {ip} -j DROP")