diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index c73a8f4..3a2b509 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,13 +1,13 @@ +import copy import time -from concurrent.futures import ThreadPoolExecutor import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell -from frostfs_testlib.steps import epoch from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers.disk_controller import DiskController -from frostfs_testlib.testing.test_control import run_optionally, wait_for_success +from frostfs_testlib.testing import parallel +from frostfs_testlib.testing.test_control import run_optionally from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_for_host_offline, @@ -139,15 +139,8 @@ class ClusterStateController: # In case if we stopped couple services, for example (s01-s04): # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. - # So in order to make sure that services are at least attempted to be started, using threads here. - with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor: - start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes) - - # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, - # But will be thrown here. - # Not ideal solution, but okay for now - for _ in start_result: - pass + # So in order to make sure that services are at least attempted to be started, using parallel runs here. + parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes)) wait_all_storage_nodes_returned(self.shell, self.cluster) self.stopped_storage_nodes = [] @@ -170,14 +163,8 @@ class ClusterStateController: if not self.stopped_s3_gates: return - with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor: - start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates) - - # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, - # But will be thrown here. - # Not ideal solution, but okay for now - for _ in start_result: - pass + parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates)) + self.stopped_s3_gates = [] @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Suspend {process_name} service in {node}")