diff --git a/pyproject.toml b/pyproject.toml index 3178bbe..ba38c03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,10 +50,10 @@ basic = "frostfs_testlib.healthcheck.basic_healthcheck:BasicHealthcheck" [tool.isort] profile = "black" src_paths = ["src", "tests"] -line_length = 100 +line_length = 120 [tool.black] -line-length = 100 +line-length = 120 target-version = ["py310"] [tool.bumpver] diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 2cf1451..473af10 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,3 +1,4 @@ +import datetime import time import frostfs_testlib.resources.optionals as optionals @@ -46,9 +47,7 @@ class ClusterStateController: return set(stopped_on_nodes) def _get_online(self, service_type: type[ServiceClass]) -> set[ServiceClass]: - stopped_svc = self._get_stopped_by_type(service_type).union( - self._from_stopped_nodes(service_type) - ) + stopped_svc = self._get_stopped_by_type(service_type).union(self._from_stopped_nodes(service_type)) online_svc = set(self.cluster.services(service_type)) - stopped_svc return online_svc @@ -67,9 +66,7 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Shutdown whole cluster") def shutdown_cluster(self, mode: str, reversed_order: bool = False): - nodes = ( - reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes - ) + nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes # Drop all ssh connections before shutdown provider = SshConnectionProvider() @@ -149,9 +146,7 @@ class ClusterStateController: def wait_s3gate(self, s3gate: S3Gate): with reporter.step(f"Wait for {s3gate} reconnection"): result = s3gate.get_metric("frostfs_s3_gw_pool_current_nodes") - assert ( - 'address="127.0.0.1' in result.stdout - ), "S3Gate should connect to local storage node" + assert 'address="127.0.0.1' in result.stdout, "S3Gate should connect to local storage node" @reporter.step_deco("Wait for S3Gates reconnection to local storage") def wait_s3gates(self): @@ -181,9 +176,7 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop {service_type} service on {node}") - def stop_service_of_type( - self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True - ): + def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True): service = node.service(service_type) service.stop_service(mask) self.stopped_services.add(service) @@ -212,9 +205,7 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all storage services on cluster") def stop_all_storage_services(self, reversed_order: bool = False): - nodes = ( - reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes - ) + nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes for node in nodes: self.stop_service_of_type(node, StorageNode) @@ -223,9 +214,7 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop all S3 gates on cluster") def stop_all_s3_gates(self, reversed_order: bool = False): - nodes = ( - reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes - ) + nodes = reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes for node in nodes: self.stop_service_of_type(node, S3Gate) @@ -279,10 +268,7 @@ class ClusterStateController: @reporter.step_deco("Resume {process_name} service in {node}") def resume_service(self, process_name: str, node: ClusterNode): node.host.wait_success_resume_process(process_name) - if ( - self.suspended_services.get(process_name) - and node in self.suspended_services[process_name] - ): + if self.suspended_services.get(process_name) and node in self.suspended_services[process_name]: self.suspended_services[process_name].remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @@ -388,9 +374,46 @@ class ClusterStateController: for node in self.nodes_with_modified_interface: if_up_down_helper.up_all_interface(node) - def _get_disk_controller( - self, node: StorageNode, device: str, mountpoint: str - ) -> DiskController: + @reporter.step_deco("Get node time") + def get_node_date(self, node: ClusterNode) -> datetime: + shell = node.host.get_shell() + return datetime.datetime.strptime(shell.exec("hwclock -r").stdout.strip(), "%Y-%m-%d %H:%M:%S.%f%z") + + @reporter.step_deco("Set node time to {in_date}") + def change_node_date(self, node: ClusterNode, in_date: datetime) -> None: + shell = node.host.get_shell() + shell.exec(f"hwclock --set --date='{in_date}'") + shell.exec("hwclock --hctosys") + node_time = self.get_node_date(node) + with reporter.step(f"Verify difference between {node_time} and {in_date} is less than a minute"): + assert (self.get_node_date(node) - in_date) < datetime.timedelta(minutes=1) + + @reporter.step_deco(f"Restore time") + def restore_node_date(self, node: ClusterNode) -> None: + shell = node.host.get_shell() + now_time = datetime.datetime.now(datetime.timezone.utc) + with reporter.step(f"Set {now_time} time"): + shell.exec(f"hwclock --set --date='{now_time}'") + shell.exec("hwclock --hctosys") + + @reporter.step_deco("Change the synchronizer status to {status}") + def set_sync_date_all_nodes(self, status: str): + if status == "active": + parallel(self._enable_date_synchronizer, self.cluster.cluster_nodes) + return + parallel(self._disable_date_synchronizer, self.cluster.cluster_nodes) + + def _enable_date_synchronizer(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() + shell.exec("timedatectl set-ntp true") + cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "active", 5) + + def _disable_date_synchronizer(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() + shell.exec("timedatectl set-ntp false") + cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "inactive", 5) + + def _get_disk_controller(self, node: StorageNode, device: str, mountpoint: str) -> DiskController: disk_controller_id = DiskController.get_id(node, device) if disk_controller_id in self.detached_disks.keys(): disk_controller = self.detached_disks[disk_controller_id]