import datetime import time import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.testing import parallel from frostfs_testlib.testing.test_control import run_optionally, wait_for_success from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_for_host_offline, wait_for_host_online, wait_for_node_online, ) reporter = get_reporter() if_up_down_helper = IfUpDownHelper() class ClusterStateController: def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None: self.stopped_nodes: list[ClusterNode] = [] self.detached_disks: dict[str, DiskController] = {} self.dropped_traffic: list[ClusterNode] = [] self.stopped_services: set[NodeBase] = set() self.cluster = cluster self.healthcheck = healthcheck self.shell = shell self.suspended_services: dict[str, list[ClusterNode]] = {} self.nodes_with_modified_interface: list[ClusterNode] = [] def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]: stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host] return set(stopped_by_node) def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]: stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)] return set(stopped_by_type) def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]: stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes]) return set(stopped_on_nodes) def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]: stopped_svc = self._get_stopped_by_type(service_type).union(self._from_stopped_nodes(service_type)) online_svc = set(self.cluster.services(service_type)) - stopped_svc return online_svc @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop host of node {node}") def stop_node_host(self, node: ClusterNode, mode: str): # Drop ssh connection for this node before shutdown provider = SshConnectionProvider() provider.drop(node.host_ip) self.stopped_nodes.append(node) with reporter.step(f"Stop host {node.host.config.address}"): node.host.stop_host(mode=mode) wait_for_host_offline(self.shell, node.storage_node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Shutdown whole cluster") def shutdown_cluster(self, mode: str, reversed_order: bool = False): nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes # Drop all ssh connections before shutdown provider = SshConnectionProvider() provider.drop_all() for node in nodes: with reporter.step(f"Stop host {node.host.config.address}"): self.stopped_nodes.append(node) node.host.stop_host(mode=mode) for node in nodes: wait_for_host_offline(self.shell, node.storage_node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() wait_for_host_online(self.shell, node.storage_node) wait_for_node_online(node.storage_node) self.stopped_nodes.remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped hosts") def start_stopped_hosts(self, reversed_order: bool = False): if not self.stopped_nodes: return nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes for node in nodes: with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() self.stopped_services.difference_update(self._get_stopped_by_node(node)) self.stopped_nodes = [] self.wait_after_storage_startup() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}") def detach_disk(self, node: StorageNode, device: str, mountpoint: str): disk_controller = self._get_disk_controller(node, device, mountpoint) self.detached_disks[disk_controller.id] = disk_controller disk_controller.detach() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Attach disk {device} at {mountpoint} on node {node}") def attach_disk(self, node: StorageNode, device: str, mountpoint: str): disk_controller = self._get_disk_controller(node, device, mountpoint) disk_controller.attach() self.detached_disks.pop(disk_controller.id, None) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Restore detached disks") def restore_disks(self): for disk_controller in self.detached_disks.values(): disk_controller.attach() self.detached_disks = {} @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all {service_type} services") def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True): services = self.cluster.services(service_type) self.stopped_services.update(services) parallel([service.stop_service for service in services], mask=mask) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start all {service_type} services") def start_services_of_type(self, service_type: type[ServiceClass]): services = self.cluster.services(service_type) parallel([service.start_service for service in services]) self.stopped_services.difference_update(set(services)) if service_type == StorageNode: self.wait_after_storage_startup() @wait_for_success(600, 60) def wait_s3gate(self, s3gate: S3Gate): with reporter.step(f"Wait for {s3gate} reconnection"): result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes") assert 'address="127.0.0.1' in result.stdout, "S3Gate should connect to local storage node" @reporter.step_deco("Wait for S3Gates reconnection to local storage") def wait_s3gates(self): online_s3gates = self._get_online(S3Gate) parallel(self.wait_s3gate, online_s3gates) @wait_for_success(600, 60) def wait_tree_healthcheck(self): nodes = self.cluster.nodes(self._get_online(StorageNode)) parallel(self.healthcheck.tree_healthcheck, nodes) @reporter.step_deco("Wait for storage reconnection to the system") def wait_after_storage_startup(self): wait_all_storage_nodes_returned(self.shell, self.cluster) self.wait_s3gates() self.wait_tree_healthcheck() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start all stopped services") def start_all_stopped_services(self): stopped_storages = self._get_stopped_by_type(StorageNode) parallel([service.start_service for service in self.stopped_services]) self.stopped_services.clear() if stopped_storages: self.wait_after_storage_startup() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop {service_type} service on {node}") def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True): service = node.service(service_type) service.stop_service(mask) self.stopped_services.add(service) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start {service_type} service on {node}") def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): service = node.service(service_type) service.start_service() self.stopped_services.discard(service) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start all stopped {service_type} services") def start_stopped_services_of_type(self, service_type: type[ServiceClass]): stopped_svc = self._get_stopped_by_type(service_type) if not stopped_svc: return parallel([svc.start_service for svc in stopped_svc]) self.stopped_services.difference_update(stopped_svc) if service_type == StorageNode: self.wait_after_storage_startup() # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all storage services on cluster") def stop_all_storage_services(self, reversed_order: bool = False): nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes for node in nodes: self.stop_service_of_type(node, StorageNode) # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all S3 gates on cluster") def stop_all_s3_gates(self, reversed_order: bool = False): nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes for node in nodes: self.stop_service_of_type(node, S3Gate) # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop storage service on {node}") def stop_storage_service(self, node: ClusterNode, mask: bool = True): self.stop_service_of_type(node, StorageNode, mask) # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start storage service on {node}") def start_storage_service(self, node: ClusterNode): self.start_service_of_type(node, StorageNode) # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped storage services") def start_stopped_storage_services(self): self.start_stopped_services_of_type(StorageNode) # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop s3 gate on {node}") def stop_s3_gate(self, node: ClusterNode, mask: bool = True): self.stop_service_of_type(node, S3Gate, mask) # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start s3 gate on {node}") def start_s3_gate(self, node: ClusterNode): self.start_service_of_type(node, S3Gate) # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped S3 gates") def start_stopped_s3_gates(self): self.start_stopped_services_of_type(S3Gate) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Suspend {process_name} service in {node}") def suspend_service(self, process_name: str, node: ClusterNode): node.host.wait_success_suspend_process(process_name) if self.suspended_services.get(process_name): self.suspended_services[process_name].append(node) else: self.suspended_services[process_name] = [node] @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Resume {process_name} service in {node}") def resume_service(self, process_name: str, node: ClusterNode): node.host.wait_success_resume_process(process_name) if self.suspended_services.get(process_name) and node in self.suspended_services[process_name]: self.suspended_services[process_name].remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start suspend processes services") def resume_suspended_services(self): for process_name, list_nodes in self.suspended_services.items(): [node.host.wait_success_resume_process(process_name) for node in list_nodes] self.suspended_services = {} @reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}") def drop_traffic( self, mode: str, node: ClusterNode, wakeup_timeout: int, ports: list[str] = None, block_nodes: list[ClusterNode] = None, ) -> None: allowed_modes = ["ports", "nodes"] assert mode in allowed_modes match mode: case "ports": IpTablesHelper.drop_input_traffic_to_port(node, ports) case "nodes": list_ip = self._parse_intefaces(block_nodes) IpTablesHelper.drop_input_traffic_to_node(node, list_ip) time.sleep(wakeup_timeout) self.dropped_traffic.append(node) @reporter.step_deco("Ping traffic") def ping_traffic( self, node: ClusterNode, nodes_list: list[ClusterNode], expect_result: int, ) -> bool: shell = node.host.get_shell() options = CommandOptions(check=False) ips = self._parse_intefaces(nodes_list) for ip in ips: code = shell.exec(f"ping {ip} -c 1", options).return_code if code != expect_result: return False return True @reporter.step_deco("Start traffic to {node}") def restore_traffic( self, mode: str, node: ClusterNode, ) -> None: allowed_modes = ["ports", "nodes"] assert mode in allowed_modes match mode: case "ports": IpTablesHelper.restore_input_traffic_to_port(node=node) case "nodes": IpTablesHelper.restore_input_traffic_to_node(node=node) @reporter.step_deco("Restore blocked nodes") def restore_all_traffic(self): parallel(self._restore_traffic_to_node, self.dropped_traffic) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Hard reboot host {node} via magic SysRq option") def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True): shell = node.host.get_shell() shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') options = CommandOptions(close_stdin=True, timeout=1, check=False) shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options) # Drop ssh connection for this node provider = SshConnectionProvider() provider.drop(node.host_ip) if wait_for_return: # Let the things to be settled # A little wait here to prevent ssh stuck during panic time.sleep(10) wait_for_host_online(self.shell, node.storage_node) wait_for_node_online(node.storage_node) @reporter.step_deco("Down {interface} to {nodes}") def down_interface(self, nodes: list[ClusterNode], interface: str): for node in nodes: if_up_down_helper.down_interface(node=node, interface=interface) assert if_up_down_helper.check_state(node=node, interface=interface) == "DOWN" self.nodes_with_modified_interface.append(node) @reporter.step_deco("Up {interface} to {nodes}") def up_interface(self, nodes: list[ClusterNode], interface: str): for node in nodes: if_up_down_helper.up_interface(node=node, interface=interface) assert if_up_down_helper.check_state(node=node, interface=interface) == "UP" if node in self.nodes_with_modified_interface: self.nodes_with_modified_interface.remove(node) @reporter.step_deco("Restore interface") def restore_interfaces(self): for node in self.nodes_with_modified_interface: if_up_down_helper.up_all_interface(node) @reporter.step_deco("Get node time") def get_node_date(self, node: ClusterNode) -> datetime: shell = node.host.get_shell() return datetime.datetime.strptime(shell.exec("hwclock -r").stdout.strip(), "%Y-%m-%d %H:%M:%S.%f%z") @reporter.step_deco("Set node time to {in_date}") def change_node_date(self, node: ClusterNode, in_date: datetime) -> None: shell = node.host.get_shell() shell.exec(f"hwclock --set --date='{in_date}'") shell.exec("hwclock --hctosys") node_time = self.get_node_date(node) with reporter.step(f"Verify difference between {node_time} and {in_date} is less than a minute"): assert (self.get_node_date(node) - in_date) < datetime.timedelta(minutes=1) @reporter.step_deco(f"Restore time") def restore_node_date(self, node: ClusterNode) -> None: shell = node.host.get_shell() now_time = datetime.datetime.now(datetime.timezone.utc) with reporter.step(f"Set {now_time} time"): shell.exec(f"hwclock --set --date='{now_time}'") shell.exec("hwclock --hctosys") @reporter.step_deco("Change the synchronizer status to {status}") def set_sync_date_all_nodes(self, status: str): if status == "active": parallel(self._enable_date_synchronizer, self.cluster.cluster_nodes) return parallel(self._disable_date_synchronizer, self.cluster.cluster_nodes) def _enable_date_synchronizer(self, cluster_node: ClusterNode): shell = cluster_node.host.get_shell() shell.exec("timedatectl set-ntp true") cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "active", 5) def _disable_date_synchronizer(self, cluster_node: ClusterNode): shell = cluster_node.host.get_shell() shell.exec("timedatectl set-ntp false") cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "inactive", 5) def _get_disk_controller(self, node: StorageNode, device: str, mountpoint: str) -> DiskController: disk_controller_id = DiskController.get_id(node, device) if disk_controller_id in self.detached_disks.keys(): disk_controller = self.detached_disks[disk_controller_id] else: disk_controller = DiskController(node, device, mountpoint) return disk_controller def _restore_traffic_to_node(self, node): IpTablesHelper.restore_input_traffic_to_port(node) IpTablesHelper.restore_input_traffic_to_node(node) def _parse_intefaces(self, nodes: list[ClusterNode]): interfaces = [] for node in nodes: dict_interfaces = node.host.config.interfaces for type, ip in dict_interfaces.items(): if "mgmt" not in type: interfaces.append(ip) return interfaces