From 2bad0f1db644c6e6941806e288af05bf0ad95056 Mon Sep 17 00:00:00 2001 From: Andrey Berezin Date: Tue, 30 May 2023 16:32:38 +0300 Subject: [PATCH] Add metabase and write_cache operations Signed-off-by: Andrey Berezin --- pyproject.toml | 2 +- src/frostfs_testlib/hosting/docker_host.py | 8 ++- src/frostfs_testlib/hosting/interfaces.py | 20 ++++++++ src/frostfs_testlib/load/load_report.py | 2 +- src/frostfs_testlib/storage/cluster.py | 15 ++++++ .../controllers/cluster_state_controller.py | 34 ++++++++++--- .../storage/dataclasses/frostfs_services.py | 6 +++ .../storage/dataclasses/node_base.py | 23 ++++++++- src/frostfs_testlib/utils/failover_utils.py | 6 +-- src/frostfs_testlib/utils/file_keeper.py | 50 +++++++++++++++++++ 10 files changed, 152 insertions(+), 14 deletions(-) create mode 100644 src/frostfs_testlib/utils/file_keeper.py diff --git a/pyproject.toml b/pyproject.toml index 69f8e29..9140ee0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ ] keywords = ["frostfs", "test"] dependencies = [ - "allure-python-commons>=2.9.45", + "allure-python-commons>=2.13.2", "docker>=4.4.0", "importlib_metadata>=5.0; python_version < '3.10'", "neo-mamba==1.0.0", diff --git a/src/frostfs_testlib/hosting/docker_host.py b/src/frostfs_testlib/hosting/docker_host.py index ccf1b64..b7f4852 100644 --- a/src/frostfs_testlib/hosting/docker_host.py +++ b/src/frostfs_testlib/hosting/docker_host.py @@ -129,12 +129,18 @@ class DockerHost(Host): timeout=service_attributes.start_timeout, ) + def delete_metabase(self, service_name: str) -> None: + raise NotImplementedError("Not implemented for docker") + + def delete_write_cache(self, service_name: str) -> None: + raise NotImplementedError("Not implemented for docker") + def delete_fstree(self, service_name: str) -> None: raise NotImplementedError("Not implemented for docker") def delete_blobovnicza(self, service_name: str) -> None: raise NotImplementedError("Not implemented for docker") - + def delete_pilorama(self, service_name: str) -> None: raise NotImplementedError("Not implemented for docker") diff --git a/src/frostfs_testlib/hosting/interfaces.py b/src/frostfs_testlib/hosting/interfaces.py index 8a617e9..9178523 100644 --- a/src/frostfs_testlib/hosting/interfaces.py +++ b/src/frostfs_testlib/hosting/interfaces.py @@ -131,6 +131,26 @@ class Host(ABC): """ + @abstractmethod + def delete_metabase(self, service_name: str) -> None: + """ + Deletes all metabase*.db in the node. + + Args: + service_name: Name of storage node service. + + """ + + @abstractmethod + def delete_write_cache(self, service_name: str) -> None: + """ + Deletes all write_cache in the node. + + Args: + service_name: Name of storage node service. + + """ + @abstractmethod def delete_blobovnicza(self, service_name: str) -> None: """ diff --git a/src/frostfs_testlib/load/load_report.py b/src/frostfs_testlib/load/load_report.py index c9c23c7..5f22515 100644 --- a/src/frostfs_testlib/load/load_report.py +++ b/src/frostfs_testlib/load/load_report.py @@ -61,7 +61,7 @@ class LoadReport: return html def _calc_unit(self, value: float, skip_units: int = 0) -> Tuple[float, str]: - units = ["B", "KB", "MB", "GB", "TB"] + units = ["B", "KiB", "MiB", "GiB", "TiB"] for unit in units[skip_units:]: if value < 1024: diff --git a/src/frostfs_testlib/storage/cluster.py b/src/frostfs_testlib/storage/cluster.py index ffca778..2158dc2 100644 --- a/src/frostfs_testlib/storage/cluster.py +++ b/src/frostfs_testlib/storage/cluster.py @@ -73,6 +73,21 @@ class ClusterNode: def s3_gate(self) -> S3Gate: return self.service(S3Gate) + def get_config(self, config_file_path: str) -> dict: + shell = self.host.get_shell() + + result = shell.exec(f"cat {config_file_path}") + config_text = result.stdout + + config = yaml.safe_load(config_text) + return config + + def save_config(self, new_config: dict, config_file_path: str) -> None: + shell = self.host.get_shell() + + config_str = yaml.dump(new_config) + shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}") + def service(self, service_type: type[ServiceClass]) -> ServiceClass: """ Get a service cluster node of specified type. diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 7782e64..705caf0 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,11 +1,11 @@ import time +from concurrent.futures import ThreadPoolExecutor import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell from frostfs_testlib.steps import epoch from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode -from frostfs_testlib.steps.node_management import wait_for_node_to_be_ready from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.testing.test_control import run_optionally, wait_for_success from frostfs_testlib.utils.failover_utils import ( @@ -22,7 +22,7 @@ class ClusterStateController: def __init__(self, shell: Shell, cluster: Cluster) -> None: self.stopped_nodes: list[ClusterNode] = [] self.detached_disks: dict[str, DiskController] = {} - self.stopped_storage_nodes: list[StorageNode] = [] + self.stopped_storage_nodes: list[ClusterNode] = [] self.cluster = cluster self.shell = shell @@ -48,6 +48,16 @@ class ClusterStateController: for node in nodes: wait_for_host_offline(self.shell, node.storage_node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop all storage services on cluster") + def stop_all_storage_services(self, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + + for node in nodes: + self.stop_storage_service(node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): @@ -92,19 +102,31 @@ class ClusterStateController: @reporter.step_deco("Stop storage service on {node}") def stop_storage_service(self, node: ClusterNode): node.storage_node.stop_service() - self.stopped_storage_nodes.append(node.storage_node) + self.stopped_storage_nodes.append(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start storage service on {node}") def start_storage_service(self, node: ClusterNode): node.storage_node.start_service() - self.stopped_storage_nodes.remove(node.storage_node) + self.stopped_storage_nodes.remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped storage services") def start_stopped_storage_services(self): - for node in self.stopped_storage_nodes: - node.start_service() + if self.stopped_storage_nodes: + # In case if we stopped couple services, for example (s01-s04): + # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. + # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. + # So in order to make sure that services are at least attempted to be started, using threads here. + with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor: + start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes) + + # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, + # But will be thrown here. + # Not ideal solution, but okay for now + for _ in start_result: + pass + wait_all_storage_nodes_returned(self.shell, self.cluster) self.stopped_storage_nodes = [] diff --git a/src/frostfs_testlib/storage/dataclasses/frostfs_services.py b/src/frostfs_testlib/storage/dataclasses/frostfs_services.py index e87d264..7bb4c2b 100644 --- a/src/frostfs_testlib/storage/dataclasses/frostfs_services.py +++ b/src/frostfs_testlib/storage/dataclasses/frostfs_services.py @@ -177,6 +177,12 @@ class StorageNode(NodeBase): def delete_pilorama(self): self.host.delete_pilorama(self.name) + def delete_metabase(self): + self.host.delete_metabase(self.name) + + def delete_write_cache(self): + self.host.delete_write_cache(self.name) + @property def label(self) -> str: return f"{self.name}: {self.get_rpc_endpoint()}" diff --git a/src/frostfs_testlib/storage/dataclasses/node_base.py b/src/frostfs_testlib/storage/dataclasses/node_base.py index 0d96775..8fcb03b 100644 --- a/src/frostfs_testlib/storage/dataclasses/node_base.py +++ b/src/frostfs_testlib/storage/dataclasses/node_base.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Optional, TypedDict, TypeVar +from typing import Optional, Tuple, TypedDict, TypeVar + +import yaml from frostfs_testlib.hosting.config import ServiceConfig from frostfs_testlib.hosting.interfaces import Host @@ -84,12 +86,29 @@ class NodeBase(ABC): ConfigAttributes.CONFIG_PATH, ) - def get_wallet_config_path(self): + def get_wallet_config_path(self) -> str: return self._get_attribute( ConfigAttributes.LOCAL_WALLET_CONFIG, ConfigAttributes.WALLET_CONFIG, ) + def get_config(self) -> Tuple[str, dict]: + config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH) + shell = self.host.get_shell() + + result = shell.exec(f"cat {config_file_path}") + config_text = result.stdout + + config = yaml.safe_load(config_text) + return config_file_path, config + + def save_config(self, new_config: dict) -> None: + config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH) + shell = self.host.get_shell() + + config_str = yaml.dump(new_config) + shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}") + def get_wallet_public_key(self): storage_wallet_path = self.get_wallet_path() storage_wallet_pass = self.get_wallet_password() diff --git a/src/frostfs_testlib/utils/failover_utils.py b/src/frostfs_testlib/utils/failover_utils.py index 3910662..8c6062f 100644 --- a/src/frostfs_testlib/utils/failover_utils.py +++ b/src/frostfs_testlib/utils/failover_utils.py @@ -28,8 +28,8 @@ def ping_host(shell: Shell, host: Host): @reporter.step_deco("Wait for storage nodes returned to cluster") def wait_all_storage_nodes_returned(shell: Shell, cluster: Cluster) -> None: - with reporter.step("Run health check for all storage nodes"): - for node in cluster.services(StorageNode): + for node in cluster.services(StorageNode): + with reporter.step(f"Run health check for storage at '{node}'"): wait_for_host_online(shell, node) wait_for_node_online(node) @@ -56,7 +56,7 @@ def wait_for_host_offline(shell: Shell, node: StorageNode): return 0 -@retry(max_attempts=10, sleep_interval=15, expected_result=True) +@retry(max_attempts=20, sleep_interval=30, expected_result=True) @reporter.step_deco("Waiting for node {node} to go online") def wait_for_node_online(node: StorageNode): try: diff --git a/src/frostfs_testlib/utils/file_keeper.py b/src/frostfs_testlib/utils/file_keeper.py new file mode 100644 index 0000000..ad6836b --- /dev/null +++ b/src/frostfs_testlib/utils/file_keeper.py @@ -0,0 +1,50 @@ +from concurrent.futures import ThreadPoolExecutor + +from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.storage.dataclasses.node_base import NodeBase + +reporter = get_reporter() + + +class FileKeeper: + """This class is responsible to make backup copy of modified file and restore when required (mostly after the test)""" + + files_to_restore: dict[NodeBase, list[str]] = {} + + @reporter.step_deco("Adding {file_to_restore} from node {node} to restore list") + def add(self, node: NodeBase, file_to_restore: str): + if node in self.files_to_restore and file_to_restore in self.files_to_restore[node]: + # Already added + return + + if node not in self.files_to_restore: + self.files_to_restore[node] = [] + + if file_to_restore not in self.files_to_restore[node]: + self.files_to_restore[node].append(file_to_restore) + + shell = node.host.get_shell() + shell.exec(f"cp {file_to_restore} {file_to_restore}.bak") + + @reporter.step_deco("Restore files") + def restore_files(self): + nodes = self.files_to_restore.keys() + if not nodes: + return + + with ThreadPoolExecutor(max_workers=len(nodes)) as executor: + results = executor.map(self._restore_files_on_node, nodes) + + self.files_to_restore.clear() + + for _ in results: + # Iterate through results for exception check if any + pass + + @reporter.step_deco("Restore files on node {node}") + def _restore_files_on_node(self, node: NodeBase): + shell = node.host.get_shell() + for file_to_restore in self.files_to_restore[node]: + with reporter.step(f"Restore file {file_to_restore} on node {node}"): + shell.exec(f"cp {file_to_restore}.bak {file_to_restore}") + shell.exec(f"rm {file_to_restore}.bak")