from typing import Callable from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC from frostfs_testlib.shell import CommandOptions from frostfs_testlib.steps.node_management import storage_node_healthcheck from frostfs_testlib.storage.cluster import ClusterNode, ServiceClass from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils.failover_utils import check_services_status reporter = get_reporter() class BasicHealthcheck(Healthcheck): def _perform(self, cluster_node: ClusterNode, checks: dict[Callable, dict]): issues: list[str] = [] for check, kwargs in checks.items(): issue = check(cluster_node, **kwargs) if issue: issues.append(issue) assert not issues, "Issues found:\n" + "\n".join(issues) @wait_for_success(900, 30) def full_healthcheck(self, cluster_node: ClusterNode): checks = { self.storage_healthcheck: {}, self._tree_healthcheck: {}, } with reporter.step(f"Perform full healthcheck for {cluster_node}"): self._perform(cluster_node, checks) @wait_for_success(900, 30) def startup_healthcheck(self, cluster_node: ClusterNode): checks = { self.storage_healthcheck: {}, self._tree_healthcheck: {}, } with reporter.step(f"Perform startup healthcheck on {cluster_node}"): self._perform(cluster_node, checks) @wait_for_success(900, 30) def storage_healthcheck(self, cluster_node: ClusterNode) -> str | None: checks = { self._storage_healthcheck: {}, } with reporter.step(f"Perform storage healthcheck on {cluster_node}"): self._perform(cluster_node, checks) @wait_for_success(120, 5) def services_healthcheck(self, cluster_node: ClusterNode): svcs_to_check = cluster_node.services checks = { check_services_status: { "service_list": svcs_to_check, "expected_status": "active", }, self._check_services: {"services": svcs_to_check}, } with reporter.step(f"Perform service healthcheck on {cluster_node}"): self._perform(cluster_node, checks) def _check_services(self, cluster_node: ClusterNode, services: list[ServiceClass]): for svc in services: result = svc.service_healthcheck() if result == False: return f"Service {svc.get_service_systemctl_name()} healthcheck failed on node {cluster_node}." @reporter.step_deco("Storage healthcheck on {cluster_node}") def _storage_healthcheck(self, cluster_node: ClusterNode) -> str | None: result = storage_node_healthcheck(cluster_node.storage_node) self._gather_socket_info(cluster_node) if result.health_status != "READY" or result.network_status != "ONLINE": return f"Node {cluster_node} is not healthy. Health={result.health_status}. Network={result.network_status}" @reporter.step_deco("Tree healthcheck on {cluster_node}") def _tree_healthcheck(self, cluster_node: ClusterNode) -> str | None: host = cluster_node.host service_config = host.get_service_config(cluster_node.storage_node.name) wallet_path = service_config.attributes["wallet_path"] wallet_password = service_config.attributes["wallet_password"] shell = host.get_shell() wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml" wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") remote_cli = FrostfsCli( shell, host.get_cli_config(FROSTFS_CLI_EXEC).exec_path, config_file=wallet_config_path, ) result = remote_cli.tree.healthcheck(rpc_endpoint="127.0.0.1:8080") if result.return_code != 0: return ( f"Error during tree healthcheck (rc={result.return_code}): {result.stdout}. \n Stderr: {result.stderr}" ) @reporter.step_deco("Gather socket info for {cluster_node}") def _gather_socket_info(self, cluster_node: ClusterNode): cluster_node.host.get_shell().exec("ss -tuln | grep 8080", CommandOptions(check=False))