diff --git a/src/frostfs_testlib/healthcheck/basic_healthcheck.py b/src/frostfs_testlib/healthcheck/basic_healthcheck.py index 3f4bc79..9c1d151 100644 --- a/src/frostfs_testlib/healthcheck/basic_healthcheck.py +++ b/src/frostfs_testlib/healthcheck/basic_healthcheck.py @@ -1,5 +1,7 @@ +from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC from frostfs_testlib.steps.node_management import storage_node_healthcheck from frostfs_testlib.storage.cluster import ClusterNode @@ -9,6 +11,33 @@ reporter = get_reporter() class BasicHealthcheck(Healthcheck): @reporter.step_deco("Perform healthcheck for {cluster_node}") def perform(self, cluster_node: ClusterNode): - health_check = storage_node_healthcheck(cluster_node.storage_node) - if health_check.health_status != "READY" or health_check.network_status != "ONLINE": - raise AssertionError("Node {cluster_node} is not healthy") + result = self.storage_healthcheck(cluster_node) + if result: + raise AssertionError(result) + + @reporter.step_deco("Tree healthcheck on {cluster_node}") + def tree_healthcheck(self, cluster_node: ClusterNode) -> str | None: + host = cluster_node.host + service_config = host.get_service_config(cluster_node.storage_node.name) + wallet_path = service_config.attributes["wallet_path"] + wallet_password = service_config.attributes["wallet_password"] + + shell = host.get_shell() + wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml" + wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' + shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") + + remote_cli = FrostfsCli( + shell, + host.get_cli_config(FROSTFS_CLI_EXEC).exec_path, + config_file=wallet_config_path, + ) + result = remote_cli.tree.healthcheck(rpc_endpoint="127.0.0.1:8080") + if result.return_code != 0: + return f"Error during tree healthcheck (rc={result.return_code}): {result.stdout}. \n Stderr: {result.stderr}" + + @reporter.step_deco("Storage healthcheck on {cluster_node}") + def storage_healthcheck(self, cluster_node: ClusterNode) -> str | None: + result = storage_node_healthcheck(cluster_node.storage_node) + if result.health_status != "READY" or result.network_status != "ONLINE": + return f"Node {cluster_node} is not healthy. Health={result.health_status}. Network={result.network_status}" diff --git a/src/frostfs_testlib/healthcheck/interfaces.py b/src/frostfs_testlib/healthcheck/interfaces.py index 0c77957..a036a82 100644 --- a/src/frostfs_testlib/healthcheck/interfaces.py +++ b/src/frostfs_testlib/healthcheck/interfaces.py @@ -7,3 +7,11 @@ class Healthcheck(ABC): @abstractmethod def perform(self, cluster_node: ClusterNode): """Perform healthcheck on the target cluster node""" + + @abstractmethod + def tree_healthcheck(self, cluster_node: ClusterNode): + """Check tree sync status on target cluster node""" + + @abstractmethod + def storage_healthcheck(self, cluster_node: ClusterNode): + """Perform storage node healthcheck on target cluster node""" diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index 635247e..4c07100 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -31,7 +31,8 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, Storage from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing import parallel, run_optionally from frostfs_testlib.testing.test_control import retry -from frostfs_testlib.utils import FileKeeper, datetime_utils +from frostfs_testlib.utils import datetime_utils +from frostfs_testlib.utils.file_keeper import FileKeeper reporter = get_reporter() diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index deb8c7f..2cf1451 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,15 +1,15 @@ -import copy import time import frostfs_testlib.resources.optionals as optionals +from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper -from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode +from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.testing import parallel -from frostfs_testlib.testing.test_control import run_optionally +from frostfs_testlib.testing.test_control import run_optionally, wait_for_success from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_for_host_offline, @@ -22,18 +22,36 @@ if_up_down_helper = IfUpDownHelper() class ClusterStateController: - def __init__(self, shell: Shell, cluster: Cluster) -> None: + def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None: self.stopped_nodes: list[ClusterNode] = [] self.detached_disks: dict[str, DiskController] = {} - self.stopped_storage_nodes: list[ClusterNode] = [] - self.stopped_s3_gates: list[ClusterNode] = [] self.dropped_traffic: list[ClusterNode] = [] self.stopped_services: set[NodeBase] = set() self.cluster = cluster + self.healthcheck = healthcheck self.shell = shell self.suspended_services: dict[str, list[ClusterNode]] = {} self.nodes_with_modified_interface: list[ClusterNode] = [] + def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]: + stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host] + return set(stopped_by_node) + + def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]: + stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)] + return set(stopped_by_type) + + def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]: + stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes]) + return set(stopped_on_nodes) + + def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]: + stopped_svc = self._get_stopped_by_type(service_type).union( + self._from_stopped_nodes(service_type) + ) + online_svc = set(self.cluster.services(service_type)) - stopped_svc + return online_svc + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop host of node {node}") def stop_node_host(self, node: ClusterNode, mode: str): @@ -65,26 +83,6 @@ class ClusterStateController: for node in nodes: wait_for_host_offline(self.shell, node.storage_node) - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop all storage services on cluster") - def stop_all_storage_services(self, reversed_order: bool = False): - nodes = ( - reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes - ) - - for node in nodes: - self.stop_storage_service(node) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop all S3 gates on cluster") - def stop_all_s3_gates(self, reversed_order: bool = False): - nodes = ( - reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes - ) - - for node in nodes: - self.stop_s3_gate(node) - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): @@ -104,13 +102,10 @@ class ClusterStateController: for node in nodes: with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() - if node in self.stopped_storage_nodes: - self.stopped_storage_nodes.remove(node) + self.stopped_services.difference_update(self._get_stopped_by_node(node)) - if node in self.stopped_s3_gates: - self.stopped_s3_gates.remove(node) self.stopped_nodes = [] - wait_all_storage_nodes_returned(self.shell, self.cluster) + self.wait_after_storage_startup() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}") @@ -133,42 +128,57 @@ class ClusterStateController: disk_controller.attach() self.detached_disks = {} - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop storage service on {node}") - def stop_storage_service(self, node: ClusterNode, mask: bool = True): - self.stopped_storage_nodes.append(node) - node.storage_node.stop_service(mask) - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all {service_type} services") - def stop_services_of_type(self, service_type: type[ServiceClass]): + def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True): services = self.cluster.services(service_type) self.stopped_services.update(services) - parallel([service.stop_service for service in services]) + parallel([service.stop_service for service in services], mask=mask) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start all {service_type} services") def start_services_of_type(self, service_type: type[ServiceClass]): services = self.cluster.services(service_type) parallel([service.start_service for service in services]) + self.stopped_services.difference_update(set(services)) if service_type == StorageNode: - wait_all_storage_nodes_returned(self.shell, self.cluster) + self.wait_after_storage_startup() - self.stopped_services = self.stopped_services - set(services) + @wait_for_success(600, 60) + def wait_s3gate(self, s3gate: S3Gate): + with reporter.step(f"Wait for {s3gate} reconnection"): + result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes") + assert ( + 'address="127.0.0.1' in result.stdout + ), "S3Gate should connect to local storage node" + + @reporter.step_deco("Wait for S3Gates reconnection to local storage") + def wait_s3gates(self): + online_s3gates = self._get_online(S3Gate) + parallel(self.wait_s3gate, online_s3gates) + + @wait_for_success(600, 60) + def wait_tree_healthcheck(self): + nodes = self.cluster.nodes(self._get_online(StorageNode)) + parallel(self.healthcheck.tree_healthcheck, nodes) + + @reporter.step_deco("Wait for storage reconnection to the system") + def wait_after_storage_startup(self): + wait_all_storage_nodes_returned(self.shell, self.cluster) + self.wait_s3gates() + self.wait_tree_healthcheck() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start all stopped services") def start_all_stopped_services(self): + stopped_storages = self._get_stopped_by_type(StorageNode) parallel([service.start_service for service in self.stopped_services]) - - for service in self.stopped_services: - if isinstance(service, StorageNode): - wait_all_storage_nodes_returned(self.shell, self.cluster) - break - self.stopped_services.clear() + if stopped_storages: + self.wait_after_storage_startup() + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop {service_type} service on {node}") def stop_service_of_type( @@ -183,50 +193,78 @@ class ClusterStateController: def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): service = node.service(service_type) service.start_service() - if service in self.stopped_services: - self.stopped_services.remove(service) + self.stopped_services.discard(service) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Start all stopped {service_type} services") + def start_stopped_services_of_type(self, service_type: type[ServiceClass]): + stopped_svc = self._get_stopped_by_type(service_type) + if not stopped_svc: + return + + parallel([svc.start_service for svc in stopped_svc]) + self.stopped_services.difference_update(stopped_svc) + + if service_type == StorageNode: + self.wait_after_storage_startup() + + # TODO: Deprecated + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop all storage services on cluster") + def stop_all_storage_services(self, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + + for node in nodes: + self.stop_service_of_type(node, StorageNode) + + # TODO: Deprecated + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop all S3 gates on cluster") + def stop_all_s3_gates(self, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + + for node in nodes: + self.stop_service_of_type(node, S3Gate) + + # TODO: Deprecated + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop storage service on {node}") + def stop_storage_service(self, node: ClusterNode, mask: bool = True): + self.stop_service_of_type(node, StorageNode, mask) + + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start storage service on {node}") def start_storage_service(self, node: ClusterNode): - node.storage_node.start_service() - self.stopped_storage_nodes.remove(node) + self.start_service_of_type(node, StorageNode) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped storage services") def start_stopped_storage_services(self): - if not self.stopped_storage_nodes: - return - - # In case if we stopped couple services, for example (s01-s04): - # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. - # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. - # So in order to make sure that services are at least attempted to be started, using parallel runs here. - parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes)) - - wait_all_storage_nodes_returned(self.shell, self.cluster) - self.stopped_storage_nodes = [] + self.start_stopped_services_of_type(StorageNode) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop s3 gate on {node}") def stop_s3_gate(self, node: ClusterNode, mask: bool = True): - node.s3_gate.stop_service(mask) - self.stopped_s3_gates.append(node) + self.stop_service_of_type(node, S3Gate, mask) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start s3 gate on {node}") def start_s3_gate(self, node: ClusterNode): - node.s3_gate.start_service() - self.stopped_s3_gates.remove(node) + self.start_service_of_type(node, S3Gate) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped S3 gates") def start_stopped_s3_gates(self): - if not self.stopped_s3_gates: - return - - parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates)) - self.stopped_s3_gates = [] + self.start_stopped_services_of_type(S3Gate) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Suspend {process_name} service in {node}") diff --git a/src/frostfs_testlib/storage/dataclasses/node_base.py b/src/frostfs_testlib/storage/dataclasses/node_base.py index 8708520..1e23c7e 100644 --- a/src/frostfs_testlib/storage/dataclasses/node_base.py +++ b/src/frostfs_testlib/storage/dataclasses/node_base.py @@ -7,6 +7,7 @@ import yaml from frostfs_testlib.hosting.config import ServiceConfig from frostfs_testlib.hosting.interfaces import Host from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.testing.readable import HumanReadableABC from frostfs_testlib.utils import wallet_utils @@ -67,6 +68,12 @@ class NodeBase(HumanReadableABC): def service_healthcheck(self) -> bool: """Service healthcheck.""" + # TODO: Migrate to sub-class Metrcis (not yet exists :)) + def get_metric(self, metric: str) -> CommandResult: + shell = self.host.get_shell() + result = shell.exec(f"curl -s {self.get_metrics_endpoint()} | grep -e '^{metric}'") + return result + def get_metrics_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS) diff --git a/src/frostfs_testlib/utils/__init__.py b/src/frostfs_testlib/utils/__init__.py index 0ac903a..fbc4a8f 100644 --- a/src/frostfs_testlib/utils/__init__.py +++ b/src/frostfs_testlib/utils/__init__.py @@ -3,6 +3,3 @@ import frostfs_testlib.utils.datetime_utils import frostfs_testlib.utils.json_utils import frostfs_testlib.utils.string_utils import frostfs_testlib.utils.wallet_utils - -# TODO: Circullar dependency FileKeeper -> NodeBase -> Utils -> FileKeeper -> NodeBase -from frostfs_testlib.utils.file_keeper import FileKeeper