import copy import time import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.testing import parallel from frostfs_testlib.testing.test_control import run_optionally from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_for_host_offline, wait_for_host_online, wait_for_node_online, ) reporter = get_reporter() class ClusterStateController: def __init__(self, shell: Shell, cluster: Cluster) -> None: self.stopped_nodes: list[ClusterNode] = [] self.detached_disks: dict[str, DiskController] = {} self.stopped_storage_nodes: list[ClusterNode] = [] self.stopped_s3_gates: list[ClusterNode] = [] self.cluster = cluster self.shell = shell self.suspended_services: dict[str, list[ClusterNode]] = {} @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop host of node {node}") def stop_node_host(self, node: ClusterNode, mode: str): with reporter.step(f"Stop host {node.host.config.address}"): node.host.stop_host(mode=mode) wait_for_host_offline(self.shell, node.storage_node) self.stopped_nodes.append(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Shutdown whole cluster") def shutdown_cluster(self, mode: str, reversed_order: bool = False): nodes = ( reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes ) for node in nodes: with reporter.step(f"Stop host {node.host.config.address}"): self.stopped_nodes.append(node) node.host.stop_host(mode=mode) for node in nodes: wait_for_host_offline(self.shell, node.storage_node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all storage services on cluster") def stop_all_storage_services(self, reversed_order: bool = False): nodes = ( reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes ) for node in nodes: self.stop_storage_service(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all S3 gates on cluster") def stop_all_s3_gates(self, reversed_order: bool = False): nodes = ( reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes ) for node in nodes: self.stop_s3_gate(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() wait_for_host_online(self.shell, node.storage_node) wait_for_node_online(node.storage_node) self.stopped_nodes.remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped hosts") def start_stopped_hosts(self, reversed_order: bool = False): if not self.stopped_nodes: return nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes for node in nodes: with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() if node in self.stopped_storage_nodes: self.stopped_storage_nodes.remove(node) if node in self.stopped_s3_gates: self.stopped_s3_gates.remove(node) self.stopped_nodes = [] wait_all_storage_nodes_returned(self.shell, self.cluster) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}") def detach_disk(self, node: StorageNode, device: str, mountpoint: str): disk_controller = self._get_disk_controller(node, device, mountpoint) self.detached_disks[disk_controller.id] = disk_controller disk_controller.detach() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Attach disk {device} at {mountpoint} on node {node}") def attach_disk(self, node: StorageNode, device: str, mountpoint: str): disk_controller = self._get_disk_controller(node, device, mountpoint) disk_controller.attach() self.detached_disks.pop(disk_controller.id, None) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Restore detached disks") def restore_disks(self): for disk_controller in self.detached_disks.values(): disk_controller.attach() self.detached_disks = {} @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop storage service on {node}") def stop_storage_service(self, node: ClusterNode): node.storage_node.stop_service() self.stopped_storage_nodes.append(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start storage service on {node}") def start_storage_service(self, node: ClusterNode): node.storage_node.start_service() self.stopped_storage_nodes.remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped storage services") def start_stopped_storage_services(self): if not self.stopped_storage_nodes: return # In case if we stopped couple services, for example (s01-s04): # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. # So in order to make sure that services are at least attempted to be started, using parallel runs here. parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes)) wait_all_storage_nodes_returned(self.shell, self.cluster) self.stopped_storage_nodes = [] @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop s3 gate on {node}") def stop_s3_gate(self, node: ClusterNode): node.s3_gate.stop_service() self.stopped_s3_gates.append(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start s3 gate on {node}") def start_s3_gate(self, node: ClusterNode): node.s3_gate.start_service() self.stopped_s3_gates.remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped S3 gates") def start_stopped_s3_gates(self): if not self.stopped_s3_gates: return parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates)) self.stopped_s3_gates = [] @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Suspend {process_name} service in {node}") def suspend_service(self, process_name: str, node: ClusterNode): node.host.wait_success_suspend_process(process_name) if self.suspended_services.get(process_name): self.suspended_services[process_name].append(node) else: self.suspended_services[process_name] = [node] @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Resume {process_name} service in {node}") def resume_service(self, process_name: str, node: ClusterNode): node.host.wait_success_resume_process(process_name) if self.suspended_services.get(process_name): self.suspended_services[process_name].append(node) else: self.suspended_services[process_name] = [node] @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start suspend processes services") def resume_suspended_services(self): for process_name, list_nodes in self.suspended_services.items(): [node.host.wait_success_resume_process(process_name) for node in list_nodes] self.suspended_services = {} @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Hard reboot host {node} via magic SysRq option") def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True): shell = node.host.get_shell() shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') options = CommandOptions(close_stdin=True, timeout=1, check=False) shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options) if wait_for_return: # Let the things to be settled # A little wait here to prevent ssh stuck during panic time.sleep(10) wait_for_host_online(self.shell, node.storage_node) wait_for_node_online(node.storage_node) def _get_disk_controller( self, node: StorageNode, device: str, mountpoint: str ) -> DiskController: disk_controller_id = DiskController.get_id(node, device) if disk_controller_id in self.detached_disks.keys(): disk_controller = self.detached_disks[disk_controller_id] else: disk_controller = DiskController(node, device, mountpoint) return disk_controller