From 4bfe99f13cc4a76aa0dc809dab3ae33c23ab2abb Mon Sep 17 00:00:00 2001 From: "m.malygina" Date: Thu, 24 Aug 2023 13:23:58 +0300 Subject: [PATCH] Stop cm before wipe data, start cm after wipe data Signed-off-by: m.malygina --- src/frostfs_testlib/storage/__init__.py | 3 +- src/frostfs_testlib/storage/cluster.py | 7 +- src/frostfs_testlib/storage/constants.py | 2 + .../controllers/cluster_state_controller.py | 134 ++++++++---------- .../storage/dataclasses/frostfs_services.py | 18 ++- 5 files changed, 83 insertions(+), 81 deletions(-) diff --git a/src/frostfs_testlib/storage/__init__.py b/src/frostfs_testlib/storage/__init__.py index 3562d25..7f08640 100644 --- a/src/frostfs_testlib/storage/__init__.py +++ b/src/frostfs_testlib/storage/__init__.py @@ -5,6 +5,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import ( MorphChain, S3Gate, StorageNode, + CM ) from frostfs_testlib.storage.service_registry import ServiceRegistry @@ -16,7 +17,7 @@ __class_registry.register_service(_FrostfsServicesNames.INNER_RING, InnerRing) __class_registry.register_service(_FrostfsServicesNames.MORPH_CHAIN, MorphChain) __class_registry.register_service(_FrostfsServicesNames.S3_GATE, S3Gate) __class_registry.register_service(_FrostfsServicesNames.HTTP_GATE, HTTPGate) - +__class_registry.register_service(_FrostfsServicesNames.CM, CM) def get_service_registry() -> ServiceRegistry: """Returns registry with registered classes related to cluster and cluster nodes. diff --git a/src/frostfs_testlib/storage/cluster.py b/src/frostfs_testlib/storage/cluster.py index 0e24ebb..8ed9c6e 100644 --- a/src/frostfs_testlib/storage/cluster.py +++ b/src/frostfs_testlib/storage/cluster.py @@ -15,6 +15,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import ( MorphChain, S3Gate, StorageNode, + CM ) from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.service_registry import ServiceRegistry @@ -76,6 +77,9 @@ class ClusterNode: @property def s3_gate(self) -> S3Gate: return self.service(S3Gate) + + def cm(self) -> CM: + return self.service(CM) def get_config(self, config_file_path: str) -> dict: shell = self.host.get_shell() @@ -98,7 +102,7 @@ class ClusterNode: Args: service_type: type of the service which should be returned, - for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain and InnerRing. + for frostfs it can be StorageNode, S3Gate, HttpGate, MorphChain, InnerRing and CM. Returns: service of service_type class. @@ -132,6 +136,7 @@ class Cluster: default_http_gate_endpoint: str default_http_hostname: str default_s3_hostname: str + default_cm_hostname: str def __init__(self, hosting: Hosting) -> None: self._hosting = hosting diff --git a/src/frostfs_testlib/storage/constants.py b/src/frostfs_testlib/storage/constants.py index 6deedfb..af0466f 100644 --- a/src/frostfs_testlib/storage/constants.py +++ b/src/frostfs_testlib/storage/constants.py @@ -13,6 +13,7 @@ class ConfigAttributes: UN_LOCODE = "un_locode" HTTP_HOSTNAME = "http_hostname" S3_HOSTNAME = "s3_hostname" + CM_HOSTNAME = "cm_hostname" class _FrostfsServicesNames: @@ -21,3 +22,4 @@ class _FrostfsServicesNames: HTTP_GATE = "http-gate" MORPH_CHAIN = "morph-chain" INNER_RING = "ir" + CM = "cm" diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 2d439d9..884f11d 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,15 +1,12 @@ -import copy -import itertools import time +from concurrent.futures import ThreadPoolExecutor import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell from frostfs_testlib.steps import epoch -from frostfs_testlib.steps.iptables import IpTablesHelper from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController -from frostfs_testlib.testing import parallel from frostfs_testlib.testing.test_control import run_optionally, wait_for_success from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, @@ -27,7 +24,7 @@ class ClusterStateController: self.detached_disks: dict[str, DiskController] = {} self.stopped_storage_nodes: list[ClusterNode] = [] self.stopped_s3_gates: list[ClusterNode] = [] - self.dropped_traffic: list[ClusterNode] = [] + self.stopped_cms: list[ClusterNode] = [] self.cluster = cluster self.shell = shell self.suspended_services: dict[str, list[ClusterNode]] = {} @@ -73,7 +70,17 @@ class ClusterStateController: for node in nodes: self.stop_s3_gate(node) + + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop all cm services on cluster") + def stop_all_cm_services(self, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + for node in nodes: + self.stop_cm_service(node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): @@ -134,6 +141,33 @@ class ClusterStateController: node.storage_node.start_service() self.stopped_storage_nodes.remove(node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop cm service on {node}") + def stop_cm_service(self, node: ClusterNode): + node.cm().stop_service() + self.stopped_cms.append(node) + + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Start cm service on {node}") + def start_cm_service(self, node: ClusterNode): + node.cm().start_service() + self.stopped_cms.remove(node) + + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Start stopped cm services") + def start_stopped_cm_services(self): + if not self.stopped_cms: + return + + # In case if we stopped couple services, for example (s01-s04): + # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. + # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. + # So in order to make sure that services are at least attempted to be started, using parallel runs here. + parallel(self.start_cm_service, copy.copy(self.stopped_cms)) + + wait_all_storage_nodes_returned(self.shell, self.cluster) + self.stopped_cms = [] + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped storage services") def start_stopped_storage_services(self): @@ -143,8 +177,15 @@ class ClusterStateController: # In case if we stopped couple services, for example (s01-s04): # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. - # So in order to make sure that services are at least attempted to be started, using parallel runs here. - parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes)) + # So in order to make sure that services are at least attempted to be started, using threads here. + with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor: + start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes) + + # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, + # But will be thrown here. + # Not ideal solution, but okay for now + for _ in start_result: + pass wait_all_storage_nodes_returned(self.shell, self.cluster) self.stopped_storage_nodes = [] @@ -167,8 +208,14 @@ class ClusterStateController: if not self.stopped_s3_gates: return - parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates)) - self.stopped_s3_gates = [] + with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor: + start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates) + + # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, + # But will be thrown here. + # Not ideal solution, but okay for now + for _ in start_result: + pass @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Suspend {process_name} service in {node}") @@ -195,62 +242,6 @@ class ClusterStateController: [node.host.wait_success_resume_process(process_name) for node in list_nodes] self.suspended_services = {} - @reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}") - def drop_traffic( - self, - mode: str, - node: ClusterNode, - wakeup_timeout: int, - ports: list[str] = None, - block_nodes: list[ClusterNode] = None, - ) -> None: - allowed_modes = ["ports", "nodes"] - assert mode in allowed_modes - - match mode: - case "ports": - IpTablesHelper.drop_input_traffic_to_port(node, ports) - case "nodes": - list_ip = self._parse_intefaces(block_nodes) - IpTablesHelper.drop_input_traffic_to_node(node, list_ip) - time.sleep(wakeup_timeout) - self.dropped_traffic.append(node) - - @reporter.step_deco("Ping traffic") - def ping_traffic( - self, - node: ClusterNode, - nodes_list: list[ClusterNode], - expect_result: int, - ) -> bool: - shell = node.host.get_shell() - options = CommandOptions(check=False) - ips = self._parse_intefaces(nodes_list) - for ip in ips: - code = shell.exec(f"ping {ip} -c 1", options).return_code - if code != expect_result: - return False - return True - - @reporter.step_deco("Start traffic to {node}") - def restore_traffic( - self, - mode: str, - node: ClusterNode, - ) -> None: - allowed_modes = ["ports", "nodes"] - assert mode in allowed_modes - - match mode: - case "ports": - IpTablesHelper.restore_input_traffic_to_port(node=node) - case "nodes": - IpTablesHelper.restore_input_traffic_to_node(node=node) - - @reporter.step_deco("Restore blocked nodes") - def restore_all_traffic(self): - parallel(self._restore_traffic_to_node, self.dropped_traffic) - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Hard reboot host {node} via magic SysRq option") def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True): @@ -277,16 +268,3 @@ class ClusterStateController: disk_controller = DiskController(node, device, mountpoint) return disk_controller - - def _restore_traffic_to_node(self, node): - IpTablesHelper.restore_input_traffic_to_port(node) - IpTablesHelper.restore_input_traffic_to_node(node) - - def _parse_intefaces(self, nodes: list[ClusterNode]): - interfaces = [] - for node in nodes: - dict_interfaces = node.host.config.interfaces - for type, ip in dict_interfaces.items(): - if "mgmt" not in type: - interfaces.append(ip) - return interfaces diff --git a/src/frostfs_testlib/storage/dataclasses/frostfs_services.py b/src/frostfs_testlib/storage/dataclasses/frostfs_services.py index ccb30d5..986caf0 100644 --- a/src/frostfs_testlib/storage/dataclasses/frostfs_services.py +++ b/src/frostfs_testlib/storage/dataclasses/frostfs_services.py @@ -61,7 +61,6 @@ class S3Gate(NodeBase): def label(self) -> str: return f"{self.name}: {self.get_endpoint()}" - class HTTPGate(NodeBase): """ Class represents HTTP gateway in a cluster @@ -152,6 +151,9 @@ class StorageNode(NodeBase): def get_s3_hostname(self) -> str: return self._get_attribute(ConfigAttributes.S3_HOSTNAME) + + def get_cm_hostname(self) -> str: + return self._get_attribute(ConfigAttributes.CM_HOSTNAME) def delete_blobovnicza(self): self.host.delete_blobovnicza(self.name) @@ -171,3 +173,17 @@ class StorageNode(NodeBase): @property def label(self) -> str: return f"{self.name}: {self.get_rpc_endpoint()}" + +class CM(NodeBase): + """ + Class represents cm service in a cluster + """ + @property + def label(self) -> str: + return f"{self.name}" + + def service_healthcheck(self) -> bool: + """Service healthcheck.""" + + def get_cm_hostname(self) -> str: + return self._get_attribute(ConfigAttributes.CM_HOSTNAME) \ No newline at end of file