From 123b5425a86ea7424cefb13ddeadf4727749e969 Mon Sep 17 00:00:00 2001 From: Andrey Berezin Date: Wed, 24 May 2023 20:29:10 +0300 Subject: [PATCH] 1. Increase wait time for k6 teardown after stop signal 2. Remove duplicated code Signed-off-by: Andrey Berezin --- .../controllers/background_load_controller.py | 207 ------------------ .../controllers/cluster_state_controller.py | 130 ----------- src/frostfs_testlib/load/k6.py | 44 ++-- src/frostfs_testlib/load/load_config.py | 5 +- src/frostfs_testlib/resources/load_params.py | 3 + .../storage/controllers/__init__.py | 4 + .../controllers/background_load_controller.py | 12 +- src/frostfs_testlib/testing/test_control.py | 3 +- src/frostfs_testlib/utils/datetime_utils.py | 3 + 9 files changed, 40 insertions(+), 371 deletions(-) delete mode 100644 src/frostfs_testlib/controllers/background_load_controller.py delete mode 100644 src/frostfs_testlib/controllers/cluster_state_controller.py create mode 100644 src/frostfs_testlib/storage/controllers/__init__.py diff --git a/src/frostfs_testlib/controllers/background_load_controller.py b/src/frostfs_testlib/controllers/background_load_controller.py deleted file mode 100644 index 4a97c291..00000000 --- a/src/frostfs_testlib/controllers/background_load_controller.py +++ /dev/null @@ -1,207 +0,0 @@ -import frostfs_testlib.resources.optionals as optionals -from frostfs_testlib.load.k6 import K6 -from frostfs_testlib.load.load_config import ( - EndpointSelectionStrategy, - K6ProcessAllocationStrategy, - LoadParams, - LoadScenario, - LoadType, -) -from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances -from frostfs_testlib.reporter import get_reporter -from frostfs_testlib.resources.load_params import ( - K6_TEARDOWN_PERIOD, - LOAD_NODE_SSH_PASSWORD, - LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, - LOAD_NODE_SSH_PRIVATE_KEY_PATH, - LOAD_NODE_SSH_USER, - LOAD_NODES, -) -from frostfs_testlib.shell.interfaces import SshCredentials -from frostfs_testlib.storage.cluster import ClusterNode -from frostfs_testlib.storage.cluster.frostfs_services import S3Gate, StorageNode -from frostfs_testlib.storage.dataclasses.wallet import WalletInfo -from frostfs_testlib.testing.test_control import run_optionally - -reporter = get_reporter() - - -class BackgroundLoadController: - k6_instances: list[K6] - k6_dir: str - load_params: LoadParams - load_nodes: list[str] - verification_params: LoadParams - nodes_under_load: list[ClusterNode] - ssh_credentials: SshCredentials - loaders_wallet: WalletInfo - endpoints: list[str] - - def __init__( - self, - k6_dir: str, - load_params: LoadParams, - loaders_wallet: WalletInfo, - nodes_under_load: list[ClusterNode], - ) -> None: - self.k6_dir = k6_dir - self.load_params = load_params - self.nodes_under_load = nodes_under_load - self.load_nodes = LOAD_NODES - self.loaders_wallet = loaders_wallet - - if load_params.endpoint_selection_strategy is None: - raise RuntimeError("endpoint_selection_strategy should not be None") - - self.endpoints = self._get_endpoints( - load_params.load_type, load_params.endpoint_selection_strategy - ) - self.verification_params = LoadParams( - clients=load_params.readers, - scenario=LoadScenario.VERIFY, - registry_file=load_params.registry_file, - verify_time=load_params.verify_time, - load_type=load_params.load_type, - load_id=load_params.load_id, - working_dir=load_params.working_dir, - endpoint_selection_strategy=load_params.endpoint_selection_strategy, - k6_process_allocation_strategy=load_params.k6_process_allocation_strategy, - ) - self.ssh_credentials = SshCredentials( - LOAD_NODE_SSH_USER, - LOAD_NODE_SSH_PASSWORD, - LOAD_NODE_SSH_PRIVATE_KEY_PATH, - LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, - ) - - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, []) - def _get_endpoints( - self, load_type: LoadType, endpoint_selection_strategy: EndpointSelectionStrategy - ): - all_endpoints = { - LoadType.gRPC: { - EndpointSelectionStrategy.ALL: list( - set( - endpoint - for node_under_load in self.nodes_under_load - for endpoint in node_under_load.service(StorageNode).get_all_rpc_endpoint() - ) - ), - EndpointSelectionStrategy.FIRST: list( - set( - node_under_load.service(StorageNode).get_rpc_endpoint() - for node_under_load in self.nodes_under_load - ) - ), - }, - # for some reason xk6 appends http protocol on its own - LoadType.S3: { - EndpointSelectionStrategy.ALL: list( - set( - endpoint.replace("http://", "") - for node_under_load in self.nodes_under_load - for endpoint in node_under_load.service(S3Gate).get_all_endpoints() - ) - ), - EndpointSelectionStrategy.FIRST: list( - set( - node_under_load.service(S3Gate).get_endpoint().replace("http://", "") - for node_under_load in self.nodes_under_load - ) - ), - }, - } - - return all_endpoints[load_type][endpoint_selection_strategy] - - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Prepare background load instances") - def prepare(self): - if self.load_params.load_type == LoadType.S3: - init_s3_client( - self.load_nodes, - self.load_params, - self.k6_dir, - self.ssh_credentials, - self.nodes_under_load, - self.loaders_wallet, - ) - - self._prepare(self.load_params) - - def _prepare(self, load_params: LoadParams): - self.k6_instances = prepare_k6_instances( - load_nodes=LOAD_NODES, - ssh_credentials=self.ssh_credentials, - k6_dir=self.k6_dir, - load_params=load_params, - endpoints=self.endpoints, - loaders_wallet=self.loaders_wallet, - ) - - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Start background load") - def start(self): - if self.load_params.preset is None: - raise RuntimeError("Preset should not be none at the moment of start") - - with reporter.step( - f"Start background load on nodes {self.nodes_under_load}: " - f"writers = {self.load_params.writers}, " - f"obj_size = {self.load_params.object_size}, " - f"load_time = {self.load_params.load_time}, " - f"prepare_json = {self.load_params.preset.pregen_json}, " - f"endpoints = {self.endpoints}" - ): - for k6_load_instance in self.k6_instances: - k6_load_instance.start() - - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Stop background load") - def stop(self): - for k6_load_instance in self.k6_instances: - k6_load_instance.stop() - - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, True) - def is_running(self): - for k6_load_instance in self.k6_instances: - if not k6_load_instance.is_running: - return False - - return True - - def wait_until_finish(self): - if self.load_params.load_time is None: - raise RuntimeError("LoadTime should not be none") - - for k6_instance in self.k6_instances: - k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD)) - - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - def verify(self): - if self.verification_params.verify_time is None: - raise RuntimeError("verify_time should not be none") - - self._prepare(self.verification_params) - with reporter.step("Run verify background load data"): - for k6_verify_instance in self.k6_instances: - k6_verify_instance.start() - k6_verify_instance.wait_until_finished(self.verification_params.verify_time) - - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("K6 run results") - def get_results(self) -> dict: - results = {} - for k6_instance in self.k6_instances: - if k6_instance.load_params.k6_process_allocation_strategy is None: - raise RuntimeError("k6_process_allocation_strategy should not be none") - - result = k6_instance.get_results() - keys_map = { - K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.load_node, - K6ProcessAllocationStrategy.PER_ENDPOINT: k6_instance.endpoints[0], - } - key = keys_map[k6_instance.load_params.k6_process_allocation_strategy] - results[key] = result - - return results diff --git a/src/frostfs_testlib/controllers/cluster_state_controller.py b/src/frostfs_testlib/controllers/cluster_state_controller.py deleted file mode 100644 index 23d1a6cc..00000000 --- a/src/frostfs_testlib/controllers/cluster_state_controller.py +++ /dev/null @@ -1,130 +0,0 @@ -import time - -import allure - -import frostfs_testlib.resources.optionals as optionals -from frostfs_testlib.reporter import get_reporter -from frostfs_testlib.shell import CommandOptions, Shell -from frostfs_testlib.steps import epoch -from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode -from frostfs_testlib.storage.controllers.disk_controller import DiskController -from frostfs_testlib.testing.test_control import run_optionally, wait_for_success -from frostfs_testlib.utils.failover_utils import ( - wait_all_storage_nodes_returned, - wait_for_host_offline, - wait_for_host_online, - wait_for_node_online, -) - -reporter = get_reporter() - - -class ClusterStateController: - def __init__(self, shell: Shell, cluster: Cluster) -> None: - self.stopped_nodes: list[ClusterNode] = [] - self.detached_disks: dict[str, DiskController] = {} - self.stopped_storage_nodes: list[StorageNode] = [] - self.cluster = cluster - self.shell = shell - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop host of node {node}") - def stop_node_host(self, node: ClusterNode, mode: str): - with allure.step(f"Stop host {node.host.config.address}"): - node.host.stop_host(mode=mode) - wait_for_host_offline(self.shell, node.storage_node) - self.stopped_nodes.append(node) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Start host of node {node}") - def start_node_host(self, node: ClusterNode): - with allure.step(f"Start host {node.host.config.address}"): - node.host.start_host() - wait_for_host_online(self.shell, node.storage_node) - wait_for_node_online(node.storage_node) - self.stopped_nodes.remove(node) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Start stopped hosts") - def start_stopped_hosts(self): - for node in self.stopped_nodes: - node.host.start_host() - self.stopped_nodes = [] - wait_all_storage_nodes_returned(self.shell, self.cluster) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Detach disk {device} at {mountpoint} on node {node}") - def detach_disk(self, node: StorageNode, device: str, mountpoint: str): - disk_controller = self._get_disk_controller(node, device, mountpoint) - self.detached_disks[disk_controller.id] = disk_controller - disk_controller.detach() - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Attach disk {device} at {mountpoint} on node {node}") - def attach_disk(self, node: StorageNode, device: str, mountpoint: str): - disk_controller = self._get_disk_controller(node, device, mountpoint) - disk_controller.attach() - self.detached_disks.pop(disk_controller.id, None) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Restore detached disks") - def restore_disks(self): - for disk_controller in self.detached_disks.values(): - disk_controller.attach() - self.detached_disks = {} - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Stop storage service on {node}") - def stop_storage_service(self, node: ClusterNode): - node.storage_node.stop_service() - self.stopped_storage_nodes.append(node.storage_node) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Start storage service on {node}") - def start_storage_service(self, node: ClusterNode): - node.storage_node.start_service() - self.stopped_storage_nodes.remove(node.storage_node) - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Start stopped storage services") - def start_stopped_storage_services(self): - for node in self.stopped_storage_nodes: - node.start_service() - self.stopped_storage_nodes = [] - - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) - @reporter.step_deco("Hard reboot host {node} via magic SysRq option") - def panic_reboot_host(self, node: ClusterNode): - shell = node.host.get_shell() - shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') - - options = CommandOptions(close_stdin=True, timeout=1, check=False) - shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options) - - # Let the things to be settled - # A little wait here to prevent ssh stuck during panic - time.sleep(10) - wait_for_host_online(self.shell, node.storage_node) - wait_for_node_online(node.storage_node) - - @reporter.step_deco("Wait up to {timeout} seconds for nodes on cluster to align epochs") - def wait_for_epochs_align(self, timeout=60): - @wait_for_success(timeout, 5, None, True) - def check_epochs(): - epochs_by_node = epoch.get_epochs_from_nodes(self.shell, self.cluster) - assert ( - len(set(epochs_by_node.values())) == 1 - ), f"unaligned epochs found: {epochs_by_node}" - - check_epochs() - - def _get_disk_controller( - self, node: StorageNode, device: str, mountpoint: str - ) -> DiskController: - disk_controller_id = DiskController.get_id(node, device) - if disk_controller_id in self.detached_disks.keys(): - disk_controller = self.detached_disks[disk_controller_id] - else: - disk_controller = DiskController(node, device, mountpoint) - - return disk_controller diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index 24ca4479..9a8b1d98 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -13,9 +13,10 @@ from frostfs_testlib.load.load_config import ( ) from frostfs_testlib.processes.remote_process import RemoteProcess from frostfs_testlib.reporter import get_reporter -from frostfs_testlib.resources.load_params import LOAD_NODE_SSH_USER +from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, LOAD_NODE_SSH_USER from frostfs_testlib.shell import Shell from frostfs_testlib.storage.dataclasses.wallet import WalletInfo +from frostfs_testlib.testing.test_control import wait_for_success EXIT_RESULT_CODE = 0 @@ -34,8 +35,6 @@ class LoadResults: class K6: _k6_process: RemoteProcess - _k6_stop_attempts: int = 5 - _k6_stop_check_interval: int = 15 def __init__( self, @@ -178,7 +177,7 @@ class K6: if timeout > 0: sleep(wait_interval) timeout -= wait_interval - self._stop() + self.stop() raise TimeoutError(f"Expected K6 finished in {timeout} sec.") def get_results(self) -> Any: @@ -200,21 +199,12 @@ class K6: reporter.attach(summary_text, allure_filename) return summary_json - @reporter.step_deco("Assert K6 should be finished") - def _k6_should_be_finished(self) -> None: - k6_rc = self._k6_process.rc() - assert k6_rc == 0, f"K6 unexpectedly finished with RC {k6_rc}" - - @reporter.step_deco("Terminate K6 on initiator") + @reporter.step_deco("Stop K6") def stop(self) -> None: - if not self.is_running: - self.get_results() - raise AssertionError("K6 unexpectedly finished") + if self.is_running: + self._k6_process.stop() - self._stop() - - k6_rc = self._k6_process.rc() - assert k6_rc == EXIT_RESULT_CODE, f"Return code of K6 job should be 0, but {k6_rc}" + self._wait_until_process_end() @property def is_running(self) -> bool: @@ -222,20 +212,12 @@ class K6: return self._k6_process.running() return False - @reporter.step_deco("Try to stop K6 with SIGTERM") - def _stop(self) -> None: - self._k6_process.stop() - with reporter.step("Wait until process end"): - for _ in range(self._k6_stop_attempts): - if not self._k6_process.running(): - break - - sleep(self._k6_stop_check_interval) - else: - raise AssertionError("Can not stop K6 process within timeout") - - def _kill(self) -> None: - self._k6_process.kill() + @reporter.step_deco("Wait until process end") + @wait_for_success( + K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout" + ) + def _wait_until_process_end(self): + return self._k6_process.running() def __log_output(self) -> None: reporter.attach(self._k6_process.stdout(full=True), "K6 stdout") diff --git a/src/frostfs_testlib/load/load_config.py b/src/frostfs_testlib/load/load_config.py index fd2fdef8..4e673215 100644 --- a/src/frostfs_testlib/load/load_config.py +++ b/src/frostfs_testlib/load/load_config.py @@ -143,6 +143,9 @@ class LoadParams: min_iteration_duration: Optional[str] = metadata_field( all_load_scenarios, None, "K6_MIN_ITERATION_DURATION" ) + # Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios + # https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout + setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT") # ------- CONSTANT VUS SCENARIO PARAMS ------- # Amount of Writers VU. @@ -202,7 +205,7 @@ class LoadParams: # Maximum verification time for k6 to verify objects. Default is BACKGROUND_LOAD_MAX_VERIFY_TIME (3600). verify_time: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "TIME_LIMIT") # Amount of Verification VU. - clients: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "CLIENTS") + verify_clients: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "CLIENTS", True) def set_id(self, load_id): self.load_id = load_id diff --git a/src/frostfs_testlib/resources/load_params.py b/src/frostfs_testlib/resources/load_params.py index 64f1aa46..6699207e 100644 --- a/src/frostfs_testlib/resources/load_params.py +++ b/src/frostfs_testlib/resources/load_params.py @@ -10,8 +10,10 @@ LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE = os.getenv("LOAD_NODE_SSH_PRIVATE_KEY_PASS BACKGROUND_WRITERS_COUNT = os.getenv("BACKGROUND_WRITERS_COUNT", 0) BACKGROUND_READERS_COUNT = os.getenv("BACKGROUND_READERS_COUNT", 0) BACKGROUND_DELETERS_COUNT = os.getenv("BACKGROUND_DELETERS_COUNT", 0) +BACKGROUND_VERIFIERS_COUNT = os.getenv("BACKGROUND_VERIFIERS_COUNT", 0) BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 600) BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE = os.getenv("BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE", 32) +BACKGROUND_LOAD_SETUP_TIMEOUT = os.getenv("BACKGROUND_LOAD_SETUP_TIMEOUT", "5s") # This will decrease load params for some weak environments BACKGROUND_LOAD_VUS_COUNT_DIVISOR = os.getenv("BACKGROUND_LOAD_VUS_COUNT_DIVISOR", 1) @@ -27,4 +29,5 @@ PRESET_CONTAINERS_COUNT = os.getenv("CONTAINERS_COUNT", "40") PRESET_OBJECTS_COUNT = os.getenv("OBJ_COUNT", "10") K6_DIRECTORY = os.getenv("K6_DIRECTORY", "/etc/k6") K6_TEARDOWN_PERIOD = os.getenv("K6_TEARDOWN_PERIOD", "30") +K6_STOP_SIGNAL_TIMEOUT = int(os.getenv("K6_STOP_SIGNAL_TIMEOUT", 300)) LOAD_CONFIG_YAML_PATH = os.getenv("LOAD_CONFIG_YAML_PATH", "load_config_yaml_file.yml") diff --git a/src/frostfs_testlib/storage/controllers/__init__.py b/src/frostfs_testlib/storage/controllers/__init__.py new file mode 100644 index 00000000..65268f45 --- /dev/null +++ b/src/frostfs_testlib/storage/controllers/__init__.py @@ -0,0 +1,4 @@ +from frostfs_testlib.storage.controllers.background_load_controller import BackgroundLoadController +from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController +from frostfs_testlib.storage.controllers.disk_controller import DiskController, DiskInfo +from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py index fedda190..f9cf0e59 100644 --- a/src/frostfs_testlib/storage/controllers/background_load_controller.py +++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py @@ -1,3 +1,5 @@ +import time + import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.load_config import ( @@ -22,6 +24,7 @@ from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.test_control import run_optionally +from frostfs_testlib.utils import datetime_utils reporter = get_reporter() @@ -57,7 +60,7 @@ class BackgroundLoadController: load_params.load_type, load_params.endpoint_selection_strategy ) self.verification_params = LoadParams( - clients=load_params.readers, + verify_clients=load_params.verify_clients, scenario=LoadScenario.VERIFY, registry_file=load_params.registry_file, verify_time=load_params.verify_time, @@ -156,6 +159,12 @@ class BackgroundLoadController: for k6_load_instance in self.k6_instances: k6_load_instance.start() + wait_after_start_time = datetime_utils.parse_time(self.load_params.setup_timeout) + 5 + with reporter.step( + f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" + ): + time.sleep(wait_after_start_time) + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step_deco("Stop background load") def stop(self): @@ -170,6 +179,7 @@ class BackgroundLoadController: return True + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) def wait_until_finish(self): if self.load_params.load_time is None: raise RuntimeError("LoadTime should not be none") diff --git a/src/frostfs_testlib/testing/test_control.py b/src/frostfs_testlib/testing/test_control.py index 5621a610..ed74f6af 100644 --- a/src/frostfs_testlib/testing/test_control.py +++ b/src/frostfs_testlib/testing/test_control.py @@ -123,6 +123,7 @@ def wait_for_success( interval: int = 1, expected_result: Any = None, fail_testcase: bool = False, + fail_message: str = "", ): """ Decorator to wait for some conditions/functions to pass successfully. @@ -141,7 +142,7 @@ def wait_for_success( try: actual_result = func(*a, **kw) if expected_result is not None: - assert expected_result == actual_result + assert expected_result == actual_result, fail_message return actual_result except Exception as ex: logger.debug(ex) diff --git a/src/frostfs_testlib/utils/datetime_utils.py b/src/frostfs_testlib/utils/datetime_utils.py index a357d8a8..830178f9 100644 --- a/src/frostfs_testlib/utils/datetime_utils.py +++ b/src/frostfs_testlib/utils/datetime_utils.py @@ -10,6 +10,9 @@ def parse_time(value: str) -> int: Returns: Number of seconds in the parsed time interval. """ + if value is None: + return 0 + value = value.lower() for suffix in ["s", "sec"]: