[#143] Change network utils

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2023-12-12 09:38:38 +03:00
parent 81dfc723da
commit cfe234c6da
4 changed files with 48 additions and 120 deletions

View file

@ -5,6 +5,7 @@ from typing import Optional
from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig
from frostfs_testlib.shell.interfaces import Shell from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.testing.readable import HumanReadableEnum from frostfs_testlib.testing.readable import HumanReadableEnum
from frostfs_testlib.testing.test_control import retry
class HostStatus(HumanReadableEnum): class HostStatus(HumanReadableEnum):
@ -25,9 +26,7 @@ class Host(ABC):
def __init__(self, config: HostConfig) -> None: def __init__(self, config: HostConfig) -> None:
self._config = config self._config = config
self._service_config_by_name = { self._service_config_by_name = {service_config.name: service_config for service_config in config.services}
service_config.name: service_config for service_config in config.services
}
self._cli_config_by_name = {cli_config.name: cli_config for cli_config in config.clis} self._cli_config_by_name = {cli_config.name: cli_config for cli_config in config.clis}
@property @property
@ -323,9 +322,7 @@ class Host(ABC):
""" """
@abstractmethod @abstractmethod
def wait_for_service_to_be_in_state( def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None:
self, systemd_service_name: str, expected_state: str, timeout: int
) -> None:
""" """
Waites for service to be in specified state. Waites for service to be in specified state.
@ -335,3 +332,23 @@ class Host(ABC):
timeout: Seconds to wait timeout: Seconds to wait
""" """
def down_interface(self, interface: str) -> None:
shell = self.get_shell()
shell.exec(f"ip link set {interface} down")
def up_interface(self, interface: str) -> None:
shell = self.get_shell()
shell.exec(f"ip link set {interface} up")
def check_state(self, interface: str) -> str:
shell = self.get_shell()
return shell.exec(f"ip link show {interface} | sed -z 's/.*state \(.*\) mode .*/\\1/'").stdout.strip()
@retry(max_attempts=5, sleep_interval=5, expected_result="UP")
def check_state_up(self, interface: str) -> str:
return self.check_state(interface=interface)
@retry(max_attempts=5, sleep_interval=5, expected_result="DOWN")
def check_state_down(self, interface: str) -> str:
return self.check_state(interface=interface)

View file

@ -9,7 +9,7 @@ class SudoInspector(CommandInspector):
def inspect(self, original_command: str, command: str) -> str: def inspect(self, original_command: str, command: str) -> str:
if not command.startswith("sudo"): if not command.startswith("sudo"):
return f"sudo {command}" return f"sudo -i {command}"
return command return command

View file

@ -1,77 +1,19 @@
from frostfs_testlib import reporter from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.testing.test_control import retry
class IpTablesHelper: class IpHelper:
@staticmethod
def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None:
shell = node.host.get_shell()
for port in ports:
shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP")
@staticmethod @staticmethod
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None: def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
for ip in block_ip: for ip in block_ip:
shell.exec(f"iptables -A INPUT -s {ip} -j DROP") shell.exec(f"ip route add blackhole {ip}")
@staticmethod
def restore_input_traffic_to_port(node: ClusterNode) -> None:
shell = node.host.get_shell()
ports = shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'").stdout.strip().split("\n")
if ports[0] == "":
return
for port in ports:
shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP")
@staticmethod @staticmethod
def restore_input_traffic_to_node(node: ClusterNode) -> None: def restore_input_traffic_to_node(node: ClusterNode) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
unlock_ip = shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'").stdout.strip().split("\n") unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False))
if unlock_ip[0] == "": if unlock_ip.return_code != 0:
return return
for ip in unlock_ip: for ip in unlock_ip.stdout.strip().split("\n"):
shell.exec(f"iptables -D INPUT -s {ip} -j DROP") shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")
# TODO Move class to HOST
class IfUpDownHelper:
@reporter.step("Down {interface} to {node}")
def down_interface(self, node: ClusterNode, interface: str) -> None:
shell = node.host.get_shell()
shell.exec(f"ifdown {interface}")
@reporter.step("Up {interface} to {node}")
def up_interface(self, node: ClusterNode, interface: str) -> None:
shell = node.host.get_shell()
shell.exec(f"ifup {interface}")
@reporter.step("Up all interface to {node}")
def up_all_interface(self, node: ClusterNode) -> None:
shell = node.host.get_shell()
interfaces = list(node.host.config.interfaces.keys())
shell.exec("ifup -av")
for name_interface in interfaces:
self.check_state_up(node, name_interface)
@reporter.step("Down all interface to {node}")
def down_all_interface(self, node: ClusterNode) -> None:
shell = node.host.get_shell()
interfaces = list(node.host.config.interfaces.keys())
shell.exec("ifdown -av")
for name_interface in interfaces:
self.check_state_down(node, name_interface)
@reporter.step("Check {node} to {interface}")
def check_state(self, node: ClusterNode, interface: str) -> str:
shell = node.host.get_shell()
return shell.exec(f"ip link show {interface} | sed -z 's/.*state \(.*\) mode .*/\\1/'").stdout.strip()
@retry(max_attempts=5, sleep_interval=5, expected_result="UP")
def check_state_up(self, node: ClusterNode, interface: str) -> str:
return self.check_state(node=node, interface=interface)
@retry(max_attempts=5, sleep_interval=5, expected_result="DOWN")
def check_state_down(self, node: ClusterNode, interface: str) -> str:
return self.check_state(node=node, interface=interface)

View file

@ -13,7 +13,7 @@ from frostfs_testlib.plugins import load_all
from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME
from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider
from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper from frostfs_testlib.steps.network import IpHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
@ -22,7 +22,6 @@ from frostfs_testlib.testing.test_control import retry, run_optionally, wait_for
from frostfs_testlib.utils.datetime_utils import parse_time from frostfs_testlib.utils.datetime_utils import parse_time
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
if_up_down_helper = IfUpDownHelper()
class StateManager: class StateManager:
@ -305,57 +304,25 @@ class ClusterStateController:
[node.host.wait_success_resume_process(process_name) for node in list_nodes] [node.host.wait_success_resume_process(process_name) for node in list_nodes]
self.suspended_services = {} self.suspended_services = {}
@reporter.step("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}") @reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic( def drop_traffic(
self, self,
mode: str,
node: ClusterNode, node: ClusterNode,
wakeup_timeout: int, wakeup_timeout: int,
ports: list[str] = None, name_interface: str,
block_nodes: list[ClusterNode] = None, block_nodes: list[ClusterNode] = None,
) -> None: ) -> None:
allowed_modes = ["ports", "nodes"] list_ip = self._parse_interfaces(block_nodes, name_interface)
assert mode in allowed_modes IpHelper.drop_input_traffic_to_node(node, list_ip)
match mode:
case "ports":
IpTablesHelper.drop_input_traffic_to_port(node, ports)
case "nodes":
list_ip = self._parse_intefaces(block_nodes)
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout) time.sleep(wakeup_timeout)
self.dropped_traffic.append(node) self.dropped_traffic.append(node)
@reporter.step("Ping traffic")
def ping_traffic(
self,
node: ClusterNode,
nodes_list: list[ClusterNode],
expect_result: int,
) -> bool:
shell = node.host.get_shell()
options = CommandOptions(check=False)
ips = self._parse_intefaces(nodes_list)
for ip in ips:
code = shell.exec(f"ping {ip} -c 1", options).return_code
if code != expect_result:
return False
return True
@reporter.step("Start traffic to {node}") @reporter.step("Start traffic to {node}")
def restore_traffic( def restore_traffic(
self, self,
mode: str,
node: ClusterNode, node: ClusterNode,
) -> None: ) -> None:
allowed_modes = ["ports", "nodes"] IpHelper.restore_input_traffic_to_node(node=node)
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.restore_input_traffic_to_port(node=node)
case "nodes":
IpTablesHelper.restore_input_traffic_to_node(node=node)
@reporter.step("Restore blocked nodes") @reporter.step("Restore blocked nodes")
def restore_all_traffic(self): def restore_all_traffic(self):
@ -385,22 +352,25 @@ class ClusterStateController:
@reporter.step("Down {interface} to {nodes}") @reporter.step("Down {interface} to {nodes}")
def down_interface(self, nodes: list[ClusterNode], interface: str): def down_interface(self, nodes: list[ClusterNode], interface: str):
for node in nodes: for node in nodes:
if_up_down_helper.down_interface(node=node, interface=interface) node.host.down_interface(interface=interface)
assert if_up_down_helper.check_state(node=node, interface=interface) == "DOWN" assert node.host.check_state(interface=interface) == "DOWN"
self.nodes_with_modified_interface.append(node) self.nodes_with_modified_interface.append(node)
@reporter.step("Up {interface} to {nodes}") @reporter.step("Up {interface} to {nodes}")
def up_interface(self, nodes: list[ClusterNode], interface: str): def up_interface(self, nodes: list[ClusterNode], interface: str):
for node in nodes: for node in nodes:
if_up_down_helper.up_interface(node=node, interface=interface) node.host.up_interface(interface=interface)
assert if_up_down_helper.check_state(node=node, interface=interface) == "UP" assert node.host.check_state(interface=interface) == "UP"
if node in self.nodes_with_modified_interface: if node in self.nodes_with_modified_interface:
self.nodes_with_modified_interface.remove(node) self.nodes_with_modified_interface.remove(node)
@reporter.step("Restore interface") @reporter.step("Restore interface")
def restore_interfaces(self): def restore_interfaces(self):
for node in self.nodes_with_modified_interface: for node in self.nodes_with_modified_interface:
if_up_down_helper.up_all_interface(node) dict_interfaces = node.host.config.interfaces.keys()
for name_interface in dict_interfaces:
if "mgmt" not in name_interface:
node.host.up_interface(interface=name_interface)
@reporter.step("Get node time") @reporter.step("Get node time")
def get_node_date(self, node: ClusterNode) -> datetime: def get_node_date(self, node: ClusterNode) -> datetime:
@ -523,15 +493,14 @@ class ClusterStateController:
return disk_controller return disk_controller
def _restore_traffic_to_node(self, node): def _restore_traffic_to_node(self, node):
IpTablesHelper.restore_input_traffic_to_port(node) IpHelper.restore_input_traffic_to_node(node)
IpTablesHelper.restore_input_traffic_to_node(node)
def _parse_intefaces(self, nodes: list[ClusterNode]): def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str):
interfaces = [] interfaces = []
for node in nodes: for node in nodes:
dict_interfaces = node.host.config.interfaces dict_interfaces = node.host.config.interfaces
for type, ip in dict_interfaces.items(): for type, ip in dict_interfaces.items():
if "mgmt" not in type: if name_interface in type:
interfaces.append(ip) interfaces.append(ip)
return interfaces return interfaces