From 5687b79b388b275e784612568455bb736702693d Mon Sep 17 00:00:00 2001 From: Andrey Berezin Date: Tue, 31 Oct 2023 14:17:54 +0300 Subject: [PATCH] [#109] Update CSC with healthchecks (cherry picked from commit e970fe2788949673913d16a73fbbb738829a9515) Signed-off-by: Dmitry Anurin --- .../healthcheck/basic_healthcheck.py | 35 +++- src/frostfs_testlib/healthcheck/interfaces.py | 8 + src/frostfs_testlib/load/runners.py | 92 +++++---- .../controllers/cluster_state_controller.py | 180 +++++++++++------- .../storage/dataclasses/node_base.py | 7 + src/frostfs_testlib/utils/__init__.py | 3 - 6 files changed, 212 insertions(+), 113 deletions(-) diff --git a/src/frostfs_testlib/healthcheck/basic_healthcheck.py b/src/frostfs_testlib/healthcheck/basic_healthcheck.py index 3f4bc79..9c1d151 100644 --- a/src/frostfs_testlib/healthcheck/basic_healthcheck.py +++ b/src/frostfs_testlib/healthcheck/basic_healthcheck.py @@ -1,5 +1,7 @@ +from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC from frostfs_testlib.steps.node_management import storage_node_healthcheck from frostfs_testlib.storage.cluster import ClusterNode @@ -9,6 +11,33 @@ reporter = get_reporter() class BasicHealthcheck(Healthcheck): @reporter.step_deco("Perform healthcheck for {cluster_node}") def perform(self, cluster_node: ClusterNode): - health_check = storage_node_healthcheck(cluster_node.storage_node) - if health_check.health_status != "READY" or health_check.network_status != "ONLINE": - raise AssertionError("Node {cluster_node} is not healthy") + result = self.storage_healthcheck(cluster_node) + if result: + raise AssertionError(result) + + @reporter.step_deco("Tree healthcheck on {cluster_node}") + def tree_healthcheck(self, cluster_node: ClusterNode) -> str | None: + host = cluster_node.host + service_config = host.get_service_config(cluster_node.storage_node.name) + wallet_path = service_config.attributes["wallet_path"] + wallet_password = service_config.attributes["wallet_password"] + + shell = host.get_shell() + wallet_config_path = f"/tmp/{cluster_node.storage_node.name}-config.yaml" + wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' + shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") + + remote_cli = FrostfsCli( + shell, + host.get_cli_config(FROSTFS_CLI_EXEC).exec_path, + config_file=wallet_config_path, + ) + result = remote_cli.tree.healthcheck(rpc_endpoint="127.0.0.1:8080") + if result.return_code != 0: + return f"Error during tree healthcheck (rc={result.return_code}): {result.stdout}. \n Stderr: {result.stderr}" + + @reporter.step_deco("Storage healthcheck on {cluster_node}") + def storage_healthcheck(self, cluster_node: ClusterNode) -> str | None: + result = storage_node_healthcheck(cluster_node.storage_node) + if result.health_status != "READY" or result.network_status != "ONLINE": + return f"Node {cluster_node} is not healthy. Health={result.health_status}. Network={result.network_status}" diff --git a/src/frostfs_testlib/healthcheck/interfaces.py b/src/frostfs_testlib/healthcheck/interfaces.py index 0c77957..a036a82 100644 --- a/src/frostfs_testlib/healthcheck/interfaces.py +++ b/src/frostfs_testlib/healthcheck/interfaces.py @@ -7,3 +7,11 @@ class Healthcheck(ABC): @abstractmethod def perform(self, cluster_node: ClusterNode): """Perform healthcheck on the target cluster node""" + + @abstractmethod + def tree_healthcheck(self, cluster_node: ClusterNode): + """Check tree sync status on target cluster node""" + + @abstractmethod + def storage_healthcheck(self, cluster_node: ClusterNode): + """Perform storage node healthcheck on target cluster node""" diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index b65f129..4c07100 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -3,7 +3,6 @@ import itertools import math import re import time -from concurrent.futures import ThreadPoolExecutor from dataclasses import fields from typing import Optional from urllib.parse import urlparse @@ -24,13 +23,16 @@ from frostfs_testlib.resources.load_params import ( LOAD_NODE_SSH_USER, LOAD_NODES, ) +from frostfs_testlib.shell.command_inspectors import SuInspector from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing import parallel, run_optionally -from frostfs_testlib.utils import FileKeeper, datetime_utils +from frostfs_testlib.testing.test_control import retry +from frostfs_testlib.utils import datetime_utils +from frostfs_testlib.utils.file_keeper import FileKeeper reporter = get_reporter() @@ -296,40 +298,53 @@ class LocalRunner(RunnerBase): nodes_under_load: list[ClusterNode], k6_dir: str, ): - @reporter.step_deco("Prepare node {cluster_node}") - def prepare_node(cluster_node: ClusterNode): - shell = cluster_node.host.get_shell() + parallel(self.prepare_node, nodes_under_load, k6_dir, load_params) - with reporter.step("Allow storage user to login into system"): - shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") - shell.exec("sudo chattr +i /etc/passwd") + @retry(3, 5, expected_result=True) + def allow_user_to_login_in_system(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() - with reporter.step("Update limits.conf"): - limits_path = "/etc/security/limits.conf" - self.file_keeper.add(cluster_node.storage_node, limits_path) - content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" - shell.exec(f"echo '{content}' | sudo tee {limits_path}") + result = None + try: + shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") + self.lock_passwd_on_node(cluster_node) + options = CommandOptions(check=False, extra_inspectors=[SuInspector(STORAGE_USER_NAME)]) + result = shell.exec("whoami", options) + finally: + if not result or result.return_code: + self.restore_passwd_on_node(cluster_node) + return False - with reporter.step("Download K6"): - shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}") - shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}") - shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}") - shell.exec(f"sudo chmod -R 777 {k6_dir}") + return True - with reporter.step("Create empty_passwd"): - self.wallet = WalletInfo( - f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" - ) - content = yaml.dump({"password": ""}) - shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') - shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") + @reporter.step_deco("Prepare node {cluster_node}") + def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams): + shell = cluster_node.host.get_shell() - with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor: - result = executor.map(prepare_node, nodes_under_load) + with reporter.step("Allow storage user to login into system"): + self.allow_user_to_login_in_system(cluster_node) - # Check for exceptions - for _ in result: - pass + with reporter.step("Update limits.conf"): + limits_path = "/etc/security/limits.conf" + self.file_keeper.add(cluster_node.storage_node, limits_path) + content = ( + f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" + ) + shell.exec(f"echo '{content}' | sudo tee {limits_path}") + + with reporter.step("Download K6"): + shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}") + shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}") + shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}") + shell.exec(f"sudo chmod -R 777 {k6_dir}") + + with reporter.step("Create empty_passwd"): + self.wallet = WalletInfo( + f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" + ) + content = yaml.dump({"password": ""}) + shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') + shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): @@ -379,16 +394,21 @@ class LocalRunner(RunnerBase): ): time.sleep(wait_after_start_time) + @reporter.step_deco("Restore passwd on {cluster_node}") + def restore_passwd_on_node(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() + shell.exec("sudo chattr -i /etc/passwd") + + @reporter.step_deco("Lock passwd on {cluster_node}") + def lock_passwd_on_node(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() + shell.exec("sudo chattr +i /etc/passwd") + def stop(self): for k6_instance in self.k6_instances: k6_instance.stop() - @reporter.step_deco("Restore passwd on {cluster_node}") - def restore_passwd_attr_on_node(cluster_node: ClusterNode): - shell = cluster_node.host.get_shell() - shell.exec("sudo chattr -i /etc/passwd") - - parallel(restore_passwd_attr_on_node, self.nodes_under_load) + parallel(self.restore_passwd_on_node, self.nodes_under_load) self.cluster_state_controller.start_stopped_storage_services() self.cluster_state_controller.start_stopped_s3_gates() diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index deb8c7f..2cf1451 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,15 +1,15 @@ -import copy import time import frostfs_testlib.resources.optionals as optionals +from frostfs_testlib.healthcheck.interfaces import Healthcheck from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell, SshConnectionProvider from frostfs_testlib.steps.network import IfUpDownHelper, IpTablesHelper -from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode +from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.testing import parallel -from frostfs_testlib.testing.test_control import run_optionally +from frostfs_testlib.testing.test_control import run_optionally, wait_for_success from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_for_host_offline, @@ -22,18 +22,36 @@ if_up_down_helper = IfUpDownHelper() class ClusterStateController: - def __init__(self, shell: Shell, cluster: Cluster) -> None: + def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None: self.stopped_nodes: list[ClusterNode] = [] self.detached_disks: dict[str, DiskController] = {} - self.stopped_storage_nodes: list[ClusterNode] = [] - self.stopped_s3_gates: list[ClusterNode] = [] self.dropped_traffic: list[ClusterNode] = [] self.stopped_services: set[NodeBase] = set() self.cluster = cluster + self.healthcheck = healthcheck self.shell = shell self.suspended_services: dict[str, list[ClusterNode]] = {} self.nodes_with_modified_interface: list[ClusterNode] = [] + def _get_stopped_by_node(self, node: ClusterNode) -> set[NodeBase]: + stopped_by_node = [svc for svc in self.stopped_services if svc.host == node.host] + return set(stopped_by_node) + + def _get_stopped_by_type(self, service_type: type[ServiceClass]) -> set[ServiceClass]: + stopped_by_type = [svc for svc in self.stopped_services if isinstance(svc, service_type)] + return set(stopped_by_type) + + def _from_stopped_nodes(self, service_type: type[ServiceClass]) -> set[ServiceClass]: + stopped_on_nodes = set([node.service(service_type) for node in self.stopped_nodes]) + return set(stopped_on_nodes) + + def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]: + stopped_svc = self._get_stopped_by_type(service_type).union( + self._from_stopped_nodes(service_type) + ) + online_svc = set(self.cluster.services(service_type)) - stopped_svc + return online_svc + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop host of node {node}") def stop_node_host(self, node: ClusterNode, mode: str): @@ -65,26 +83,6 @@ class ClusterStateController: for node in nodes: wait_for_host_offline(self.shell, node.storage_node) - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop all storage services on cluster") - def stop_all_storage_services(self, reversed_order: bool = False): - nodes = ( - reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes - ) - - for node in nodes: - self.stop_storage_service(node) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop all S3 gates on cluster") - def stop_all_s3_gates(self, reversed_order: bool = False): - nodes = ( - reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes - ) - - for node in nodes: - self.stop_s3_gate(node) - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): @@ -104,13 +102,10 @@ class ClusterStateController: for node in nodes: with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() - if node in self.stopped_storage_nodes: - self.stopped_storage_nodes.remove(node) + self.stopped_services.difference_update(self._get_stopped_by_node(node)) - if node in self.stopped_s3_gates: - self.stopped_s3_gates.remove(node) self.stopped_nodes = [] - wait_all_storage_nodes_returned(self.shell, self.cluster) + self.wait_after_storage_startup() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}") @@ -133,42 +128,57 @@ class ClusterStateController: disk_controller.attach() self.detached_disks = {} - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop storage service on {node}") - def stop_storage_service(self, node: ClusterNode, mask: bool = True): - self.stopped_storage_nodes.append(node) - node.storage_node.stop_service(mask) - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all {service_type} services") - def stop_services_of_type(self, service_type: type[ServiceClass]): + def stop_services_of_type(self, service_type: type[ServiceClass], mask: bool = True): services = self.cluster.services(service_type) self.stopped_services.update(services) - parallel([service.stop_service for service in services]) + parallel([service.stop_service for service in services], mask=mask) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start all {service_type} services") def start_services_of_type(self, service_type: type[ServiceClass]): services = self.cluster.services(service_type) parallel([service.start_service for service in services]) + self.stopped_services.difference_update(set(services)) if service_type == StorageNode: - wait_all_storage_nodes_returned(self.shell, self.cluster) + self.wait_after_storage_startup() - self.stopped_services = self.stopped_services - set(services) + @wait_for_success(600, 60) + def wait_s3gate(self, s3gate: S3Gate): + with reporter.step(f"Wait for {s3gate} reconnection"): + result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes") + assert ( + 'address="127.0.0.1' in result.stdout + ), "S3Gate should connect to local storage node" + + @reporter.step_deco("Wait for S3Gates reconnection to local storage") + def wait_s3gates(self): + online_s3gates = self._get_online(S3Gate) + parallel(self.wait_s3gate, online_s3gates) + + @wait_for_success(600, 60) + def wait_tree_healthcheck(self): + nodes = self.cluster.nodes(self._get_online(StorageNode)) + parallel(self.healthcheck.tree_healthcheck, nodes) + + @reporter.step_deco("Wait for storage reconnection to the system") + def wait_after_storage_startup(self): + wait_all_storage_nodes_returned(self.shell, self.cluster) + self.wait_s3gates() + self.wait_tree_healthcheck() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start all stopped services") def start_all_stopped_services(self): + stopped_storages = self._get_stopped_by_type(StorageNode) parallel([service.start_service for service in self.stopped_services]) - - for service in self.stopped_services: - if isinstance(service, StorageNode): - wait_all_storage_nodes_returned(self.shell, self.cluster) - break - self.stopped_services.clear() + if stopped_storages: + self.wait_after_storage_startup() + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop {service_type} service on {node}") def stop_service_of_type( @@ -183,50 +193,78 @@ class ClusterStateController: def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): service = node.service(service_type) service.start_service() - if service in self.stopped_services: - self.stopped_services.remove(service) + self.stopped_services.discard(service) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Start all stopped {service_type} services") + def start_stopped_services_of_type(self, service_type: type[ServiceClass]): + stopped_svc = self._get_stopped_by_type(service_type) + if not stopped_svc: + return + + parallel([svc.start_service for svc in stopped_svc]) + self.stopped_services.difference_update(stopped_svc) + + if service_type == StorageNode: + self.wait_after_storage_startup() + + # TODO: Deprecated + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop all storage services on cluster") + def stop_all_storage_services(self, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + + for node in nodes: + self.stop_service_of_type(node, StorageNode) + + # TODO: Deprecated + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop all S3 gates on cluster") + def stop_all_s3_gates(self, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + + for node in nodes: + self.stop_service_of_type(node, S3Gate) + + # TODO: Deprecated + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop storage service on {node}") + def stop_storage_service(self, node: ClusterNode, mask: bool = True): + self.stop_service_of_type(node, StorageNode, mask) + + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start storage service on {node}") def start_storage_service(self, node: ClusterNode): - node.storage_node.start_service() - self.stopped_storage_nodes.remove(node) + self.start_service_of_type(node, StorageNode) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped storage services") def start_stopped_storage_services(self): - if not self.stopped_storage_nodes: - return - - # In case if we stopped couple services, for example (s01-s04): - # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. - # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. - # So in order to make sure that services are at least attempted to be started, using parallel runs here. - parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes)) - - wait_all_storage_nodes_returned(self.shell, self.cluster) - self.stopped_storage_nodes = [] + self.start_stopped_services_of_type(StorageNode) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop s3 gate on {node}") def stop_s3_gate(self, node: ClusterNode, mask: bool = True): - node.s3_gate.stop_service(mask) - self.stopped_s3_gates.append(node) + self.stop_service_of_type(node, S3Gate, mask) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start s3 gate on {node}") def start_s3_gate(self, node: ClusterNode): - node.s3_gate.start_service() - self.stopped_s3_gates.remove(node) + self.start_service_of_type(node, S3Gate) + # TODO: Deprecated @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped S3 gates") def start_stopped_s3_gates(self): - if not self.stopped_s3_gates: - return - - parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates)) - self.stopped_s3_gates = [] + self.start_stopped_services_of_type(S3Gate) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Suspend {process_name} service in {node}") diff --git a/src/frostfs_testlib/storage/dataclasses/node_base.py b/src/frostfs_testlib/storage/dataclasses/node_base.py index 8708520..1e23c7e 100644 --- a/src/frostfs_testlib/storage/dataclasses/node_base.py +++ b/src/frostfs_testlib/storage/dataclasses/node_base.py @@ -7,6 +7,7 @@ import yaml from frostfs_testlib.hosting.config import ServiceConfig from frostfs_testlib.hosting.interfaces import Host from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.testing.readable import HumanReadableABC from frostfs_testlib.utils import wallet_utils @@ -67,6 +68,12 @@ class NodeBase(HumanReadableABC): def service_healthcheck(self) -> bool: """Service healthcheck.""" + # TODO: Migrate to sub-class Metrcis (not yet exists :)) + def get_metric(self, metric: str) -> CommandResult: + shell = self.host.get_shell() + result = shell.exec(f"curl -s {self.get_metrics_endpoint()} | grep -e '^{metric}'") + return result + def get_metrics_endpoint(self) -> str: return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS) diff --git a/src/frostfs_testlib/utils/__init__.py b/src/frostfs_testlib/utils/__init__.py index 0ac903a..fbc4a8f 100644 --- a/src/frostfs_testlib/utils/__init__.py +++ b/src/frostfs_testlib/utils/__init__.py @@ -3,6 +3,3 @@ import frostfs_testlib.utils.datetime_utils import frostfs_testlib.utils.json_utils import frostfs_testlib.utils.string_utils import frostfs_testlib.utils.wallet_utils - -# TODO: Circullar dependency FileKeeper -> NodeBase -> Utils -> FileKeeper -> NodeBase -from frostfs_testlib.utils.file_keeper import FileKeeper -- 2.45.2