Make load things parallel #48
8 changed files with 284 additions and 178 deletions
|
@ -39,6 +39,10 @@ class ScenarioRunner(ABC):
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Stop K6 instances"""
|
"""Stop K6 instances"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def preset(self):
|
||||||
|
"""Run preset for load"""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def is_running(self) -> bool:
|
def is_running(self) -> bool:
|
||||||
|
|
|
@ -72,8 +72,8 @@ class K6:
|
||||||
def process_dir(self) -> str:
|
def process_dir(self) -> str:
|
||||||
return self._k6_process.process_dir
|
return self._k6_process.process_dir
|
||||||
|
|
||||||
@reporter.step_deco("Preset containers and objects")
|
|
||||||
def preset(self) -> str:
|
def preset(self) -> str:
|
||||||
|
with reporter.step(f"Run preset on loader {self.loader.ip} for endpoints {self.endpoints}"):
|
||||||
preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py"
|
preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py"
|
||||||
preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py"
|
preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py"
|
||||||
preset_map = {
|
preset_map = {
|
||||||
|
@ -85,13 +85,13 @@ class K6:
|
||||||
base_args = {
|
base_args = {
|
||||||
preset_grpc: [
|
preset_grpc: [
|
||||||
preset_grpc,
|
preset_grpc,
|
||||||
f"--endpoint {self.endpoints[0]}",
|
f"--endpoint {','.join(self.endpoints)}",
|
||||||
f"--wallet {self.wallet.path} ",
|
f"--wallet {self.wallet.path} ",
|
||||||
f"--config {self.wallet.config_path} ",
|
f"--config {self.wallet.config_path} ",
|
||||||
],
|
],
|
||||||
preset_s3: [
|
preset_s3: [
|
||||||
preset_s3,
|
preset_s3,
|
||||||
f"--endpoint {self.endpoints[0]}",
|
f"--endpoint {','.join(self.endpoints)}",
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +232,6 @@ class K6:
|
||||||
|
|
||||||
self._wait_until_process_end()
|
self._wait_until_process_end()
|
||||||
|
|
||||||
@property
|
|
||||||
def is_running(self) -> bool:
|
def is_running(self) -> bool:
|
||||||
if self._k6_process:
|
if self._k6_process:
|
||||||
return self._k6_process.running()
|
return self._k6_process.running()
|
||||||
|
|
|
@ -43,8 +43,10 @@ class LoadReport:
|
||||||
return html
|
return html
|
||||||
|
|
||||||
def _get_load_params_section_html(self) -> str:
|
def _get_load_params_section_html(self) -> str:
|
||||||
params: str = yaml.safe_dump(self.load_test, sort_keys=False)
|
params: str = yaml.safe_dump(
|
||||||
params = params.replace("\n", "<br>")
|
[self.load_test], sort_keys=False, indent=2, explicit_start=True
|
||||||
|
)
|
||||||
|
params = params.replace("\n", "<br>").replace(" ", " ")
|
||||||
section_html = f"""<h3>Scenario params</h3>
|
section_html = f"""<h3>Scenario params</h3>
|
||||||
|
|
||||||
<pre>{params}</pre>
|
<pre>{params}</pre>
|
||||||
|
@ -139,7 +141,7 @@ class LoadReport:
|
||||||
duration = self._seconds_to_formatted_duration(self.load_params.load_time)
|
duration = self._seconds_to_formatted_duration(self.load_params.load_time)
|
||||||
model = self._get_model_string()
|
model = self._get_model_string()
|
||||||
# write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s
|
# write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s
|
||||||
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit} {total_rate:.2f}/s"
|
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s"
|
||||||
|
|
||||||
html = f"""
|
html = f"""
|
||||||
<table border="1" cellpadding="5px"><tbody>
|
<table border="1" cellpadding="5px"><tbody>
|
||||||
|
|
|
@ -28,15 +28,31 @@ from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||||||
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
|
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
|
||||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||||
from frostfs_testlib.testing.test_control import run_optionally
|
from frostfs_testlib.testing import parallel, run_optionally
|
||||||
from frostfs_testlib.utils import datetime_utils
|
from frostfs_testlib.utils import FileKeeper, datetime_utils
|
||||||
from frostfs_testlib.utils.file_keeper import FileKeeper
|
|
||||||
|
|
||||||
reporter = get_reporter()
|
reporter = get_reporter()
|
||||||
|
|
||||||
|
|
||||||
class DefaultRunner(ScenarioRunner):
|
class RunnerBase(ScenarioRunner):
|
||||||
k6_instances: list[K6]
|
k6_instances: list[K6]
|
||||||
|
|
||||||
|
@reporter.step_deco("Run preset on loaders")
|
||||||
|
def preset(self):
|
||||||
|
parallel([k6.preset for k6 in self.k6_instances])
|
||||||
|
|
||||||
|
@reporter.step_deco("Wait until load finish")
|
||||||
|
def wait_until_finish(self):
|
||||||
|
parallel([k6.wait_until_finished for k6 in self.k6_instances])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_running(self):
|
||||||
|
futures = parallel([k6.is_running for k6 in self.k6_instances])
|
||||||
|
|
||||||
|
return any([future.result() for future in futures])
|
||||||
|
|
||||||
|
|
||||||
|
class DefaultRunner(RunnerBase):
|
||||||
loaders: list[Loader]
|
loaders: list[Loader]
|
||||||
loaders_wallet: WalletInfo
|
loaders_wallet: WalletInfo
|
||||||
|
|
||||||
|
@ -51,7 +67,7 @@ class DefaultRunner(ScenarioRunner):
|
||||||
self.loaders_wallet = loaders_wallet
|
self.loaders_wallet = loaders_wallet
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
||||||
@reporter.step_deco("Prepare load instances")
|
@reporter.step_deco("Preparation steps")
|
||||||
def prepare(
|
def prepare(
|
||||||
self,
|
self,
|
||||||
load_params: LoadParams,
|
load_params: LoadParams,
|
||||||
|
@ -68,12 +84,21 @@ class DefaultRunner(ScenarioRunner):
|
||||||
]
|
]
|
||||||
grpc_peer = storage_node.get_rpc_endpoint()
|
grpc_peer = storage_node.get_rpc_endpoint()
|
||||||
|
|
||||||
for loader in self.loaders:
|
parallel(
|
||||||
|
self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir
|
||||||
|
)
|
||||||
|
|
||||||
|
def _prepare_loader(
|
||||||
|
self,
|
||||||
|
loader: Loader,
|
||||||
|
load_params: LoadParams,
|
||||||
|
grpc_peer: str,
|
||||||
|
s3_public_keys: list[str],
|
||||||
|
k6_dir: str,
|
||||||
|
):
|
||||||
with reporter.step(f"Init s3 client on {loader.ip}"):
|
with reporter.step(f"Init s3 client on {loader.ip}"):
|
||||||
shell = loader.get_shell()
|
shell = loader.get_shell()
|
||||||
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(
|
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
|
||||||
shell, FROSTFS_AUTHMATE_EXEC
|
|
||||||
)
|
|
||||||
issue_secret_output = frostfs_authmate_exec.secret.issue(
|
issue_secret_output = frostfs_authmate_exec.secret.issue(
|
||||||
wallet=self.loaders_wallet.path,
|
wallet=self.loaders_wallet.path,
|
||||||
peer=grpc_peer,
|
peer=grpc_peer,
|
||||||
|
@ -95,9 +120,7 @@ class DefaultRunner(ScenarioRunner):
|
||||||
)
|
)
|
||||||
|
|
||||||
configure_input = [
|
configure_input = [
|
||||||
InteractiveInput(
|
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
|
||||||
prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id
|
|
||||||
),
|
|
||||||
InteractiveInput(
|
InteractiveInput(
|
||||||
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
|
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
|
||||||
),
|
),
|
||||||
|
@ -106,10 +129,7 @@ class DefaultRunner(ScenarioRunner):
|
||||||
]
|
]
|
||||||
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
|
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
|
||||||
|
|
||||||
def wait_until_finish(self):
|
@reporter.step_deco("Init k6 instances")
|
||||||
for k6_instance in self.k6_instances:
|
|
||||||
k6_instance.wait_until_finished()
|
|
||||||
|
|
||||||
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
||||||
self.k6_instances = []
|
self.k6_instances = []
|
||||||
cycled_loaders = itertools.cycle(self.loaders)
|
cycled_loaders = itertools.cycle(self.loaders)
|
||||||
|
@ -131,29 +151,32 @@ class DefaultRunner(ScenarioRunner):
|
||||||
load_params, k6_processes_count
|
load_params, k6_processes_count
|
||||||
)
|
)
|
||||||
|
|
||||||
for distributed_load_params in distributed_load_params_list:
|
futures = parallel(
|
||||||
loader = next(cycled_loaders)
|
self._init_k6_instance,
|
||||||
shell = loader.get_shell()
|
distributed_load_params_list,
|
||||||
with reporter.step(
|
loader=cycled_loaders,
|
||||||
f"Init K6 instances on {loader.ip} for load id {distributed_load_params.load_id}"
|
endpoints=endpoints_gen,
|
||||||
):
|
k6_dir=k6_dir,
|
||||||
with reporter.step(f"Make working directory"):
|
|
||||||
shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}")
|
|
||||||
shell.exec(
|
|
||||||
f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}"
|
|
||||||
)
|
)
|
||||||
|
self.k6_instances = [future.result() for future in futures]
|
||||||
|
|
||||||
k6_instance = K6(
|
def _init_k6_instance(
|
||||||
distributed_load_params,
|
self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str
|
||||||
next(endpoints_gen),
|
):
|
||||||
|
shell = loader.get_shell()
|
||||||
|
with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"):
|
||||||
|
with reporter.step(f"Make working directory"):
|
||||||
|
shell.exec(f"sudo mkdir -p {load_params_for_loader.working_dir}")
|
||||||
|
shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {load_params_for_loader.working_dir}")
|
||||||
|
|
||||||
|
return K6(
|
||||||
|
load_params_for_loader,
|
||||||
|
endpoints,
|
||||||
k6_dir,
|
k6_dir,
|
||||||
shell,
|
shell,
|
||||||
loader,
|
loader,
|
||||||
self.loaders_wallet,
|
self.loaders_wallet,
|
||||||
)
|
)
|
||||||
self.k6_instances.append(k6_instance)
|
|
||||||
if load_params.preset:
|
|
||||||
k6_instance.preset()
|
|
||||||
|
|
||||||
def _get_distributed_load_params_list(
|
def _get_distributed_load_params_list(
|
||||||
self, original_load_params: LoadParams, workers_count: int
|
self, original_load_params: LoadParams, workers_count: int
|
||||||
|
@ -215,15 +238,7 @@ class DefaultRunner(ScenarioRunner):
|
||||||
def start(self):
|
def start(self):
|
||||||
load_params = self.k6_instances[0].load_params
|
load_params = self.k6_instances[0].load_params
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor:
|
parallel([k6.start for k6 in self.k6_instances])
|
||||||
futures = [executor.submit(k6.start) for k6 in self.k6_instances]
|
|
||||||
|
|
||||||
# Check for exceptions
|
|
||||||
exceptions = [future.exception() for future in futures if future.exception()]
|
|
||||||
if exceptions:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"The following exceptions occured during start of k6: {exceptions}"
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
||||||
with reporter.step(
|
with reporter.step(
|
||||||
|
@ -251,17 +266,8 @@ class DefaultRunner(ScenarioRunner):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@property
|
|
||||||
def is_running(self):
|
|
||||||
for k6_instance in self.k6_instances:
|
|
||||||
if not k6_instance.is_running:
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
class LocalRunner(RunnerBase):
|
||||||
|
|
||||||
|
|
||||||
class LocalRunner(ScenarioRunner):
|
|
||||||
k6_instances: list[K6]
|
|
||||||
loaders: list[Loader]
|
loaders: list[Loader]
|
||||||
cluster_state_controller: ClusterStateController
|
cluster_state_controller: ClusterStateController
|
||||||
file_keeper: FileKeeper
|
file_keeper: FileKeeper
|
||||||
|
@ -278,7 +284,7 @@ class LocalRunner(ScenarioRunner):
|
||||||
self.loaders = [NodeLoader(node) for node in nodes_under_load]
|
self.loaders = [NodeLoader(node) for node in nodes_under_load]
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
||||||
@reporter.step_deco("Prepare load instances")
|
@reporter.step_deco("Preparation steps")
|
||||||
def prepare(
|
def prepare(
|
||||||
self,
|
self,
|
||||||
load_params: LoadParams,
|
load_params: LoadParams,
|
||||||
|
@ -319,15 +325,20 @@ class LocalRunner(ScenarioRunner):
|
||||||
for _ in result:
|
for _ in result:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def wait_until_finish(self):
|
@reporter.step_deco("Init k6 instances")
|
||||||
for k6_instance in self.k6_instances:
|
|
||||||
k6_instance.wait_until_finished()
|
|
||||||
|
|
||||||
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
|
||||||
self.k6_instances = []
|
self.k6_instances = []
|
||||||
for loader in self.loaders:
|
futures = parallel(
|
||||||
|
self._init_k6_instance,
|
||||||
|
self.loaders,
|
||||||
|
load_params,
|
||||||
|
k6_dir,
|
||||||
|
)
|
||||||
|
self.k6_instances = [future.result() for future in futures]
|
||||||
|
|
||||||
|
def _init_k6_instance(self, loader: Loader, load_params: LoadParams, k6_dir: str):
|
||||||
shell = loader.get_shell()
|
shell = loader.get_shell()
|
||||||
with reporter.step(f"Init K6 instances on {loader.ip}"):
|
with reporter.step(f"Init K6 instance on {loader.ip}"):
|
||||||
with reporter.step(f"Make working directory"):
|
with reporter.step(f"Make working directory"):
|
||||||
shell.exec(f"sudo mkdir -p {load_params.working_dir}")
|
shell.exec(f"sudo mkdir -p {load_params.working_dir}")
|
||||||
# If we chmod /home/<user_name> folder we can no longer ssh to the node
|
# If we chmod /home/<user_name> folder we can no longer ssh to the node
|
||||||
|
@ -339,7 +350,7 @@ class LocalRunner(ScenarioRunner):
|
||||||
):
|
):
|
||||||
shell.exec(f"sudo chmod -R 777 {load_params.working_dir}")
|
shell.exec(f"sudo chmod -R 777 {load_params.working_dir}")
|
||||||
|
|
||||||
k6_instance = K6(
|
return K6(
|
||||||
load_params,
|
load_params,
|
||||||
["localhost:8080"],
|
["localhost:8080"],
|
||||||
k6_dir,
|
k6_dir,
|
||||||
|
@ -347,9 +358,6 @@ class LocalRunner(ScenarioRunner):
|
||||||
loader,
|
loader,
|
||||||
self.wallet,
|
self.wallet,
|
||||||
)
|
)
|
||||||
self.k6_instances.append(k6_instance)
|
|
||||||
if load_params.preset:
|
|
||||||
k6_instance.preset()
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
load_params = self.k6_instances[0].load_params
|
load_params = self.k6_instances[0].load_params
|
||||||
|
@ -357,15 +365,7 @@ class LocalRunner(ScenarioRunner):
|
||||||
self.cluster_state_controller.stop_all_s3_gates()
|
self.cluster_state_controller.stop_all_s3_gates()
|
||||||
self.cluster_state_controller.stop_all_storage_services()
|
self.cluster_state_controller.stop_all_storage_services()
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor:
|
parallel([k6.start for k6 in self.k6_instances])
|
||||||
futures = [executor.submit(k6.start) for k6 in self.k6_instances]
|
|
||||||
|
|
||||||
# Check for exceptions
|
|
||||||
exceptions = [future.exception() for future in futures if future.exception()]
|
|
||||||
if exceptions:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"The following exceptions occured during start of k6: {exceptions}"
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
|
||||||
with reporter.step(
|
with reporter.step(
|
||||||
|
@ -387,11 +387,3 @@ class LocalRunner(ScenarioRunner):
|
||||||
results[k6_instance.loader.ip] = result
|
results[k6_instance.loader.ip] = result
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@property
|
|
||||||
def is_running(self):
|
|
||||||
for k6_instance in self.k6_instances:
|
|
||||||
if not k6_instance.is_running:
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
|
@ -80,14 +80,17 @@ class BackgroundLoadController:
|
||||||
LoadType.S3: {
|
LoadType.S3: {
|
||||||
EndpointSelectionStrategy.ALL: list(
|
EndpointSelectionStrategy.ALL: list(
|
||||||
set(
|
set(
|
||||||
endpoint.replace("http://", "")
|
endpoint.replace("http://", "").replace("https://", "")
|
||||||
for node_under_load in self.nodes_under_load
|
for node_under_load in self.nodes_under_load
|
||||||
for endpoint in node_under_load.service(S3Gate).get_all_endpoints()
|
for endpoint in node_under_load.service(S3Gate).get_all_endpoints()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
EndpointSelectionStrategy.FIRST: list(
|
EndpointSelectionStrategy.FIRST: list(
|
||||||
set(
|
set(
|
||||||
node_under_load.service(S3Gate).get_endpoint().replace("http://", "")
|
node_under_load.service(S3Gate)
|
||||||
|
.get_endpoint()
|
||||||
|
.replace("http://", "")
|
||||||
|
.replace("https://", "")
|
||||||
for node_under_load in self.nodes_under_load
|
for node_under_load in self.nodes_under_load
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
@ -131,8 +134,13 @@ class BackgroundLoadController:
|
||||||
@reporter.step_deco("Startup load")
|
@reporter.step_deco("Startup load")
|
||||||
def startup(self):
|
def startup(self):
|
||||||
self.prepare()
|
self.prepare()
|
||||||
|
self.preset()
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
||||||
|
def preset(self):
|
||||||
|
self.runner.preset()
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
||||||
@reporter.step_deco("Stop and get results of load")
|
@reporter.step_deco("Stop and get results of load")
|
||||||
def teardown(self, load_report: Optional[LoadReport] = None):
|
def teardown(self, load_report: Optional[LoadReport] = None):
|
||||||
|
|
2
src/frostfs_testlib/testing/__init__.py
Normal file
2
src/frostfs_testlib/testing/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
from frostfs_testlib.testing.parallel import parallel
|
||||||
|
from frostfs_testlib.testing.test_control import expect_not_raises, run_optionally, wait_for_success
|
98
src/frostfs_testlib/testing/parallel.py
Normal file
98
src/frostfs_testlib/testing/parallel.py
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
import itertools
|
||||||
|
from concurrent.futures import Future, ThreadPoolExecutor
|
||||||
|
from typing import Callable, Collection, Optional, Union
|
||||||
|
|
||||||
|
|
||||||
|
def parallel(
|
||||||
|
fn: Union[Callable, list[Callable]],
|
||||||
|
parallel_items: Optional[Collection] = None,
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
) -> list[Future]:
|
||||||
|
"""Parallel execution of selected function or list of function using ThreadPoolExecutor.
|
||||||
|
Also checks the exceptions of each thread.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fn: function(s) to run. Can work in 2 modes:
|
||||||
|
1. If you have dedicated function with some items to process in parallel,
|
||||||
|
like you do with executor.map(fn, parallel_items), pass this function as fn.
|
||||||
|
2. If you need to process each item with it's own method, like you do
|
||||||
|
with executor.submit(fn, args, kwargs), pass list of methods here.
|
||||||
|
See examples in runners.py in this repo.
|
||||||
|
parallel_items: items to iterate on (should be None in case of 2nd mode).
|
||||||
|
args: any other args required in target function(s).
|
||||||
|
if any arg is itertool.cycle, it will be iterated before passing to new thread.
|
||||||
|
kwargs: any other kwargs required in target function(s)
|
||||||
|
if any kwarg is itertool.cycle, it will be iterated before passing to new thread.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of futures.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if callable(fn):
|
||||||
|
if not parallel_items:
|
||||||
|
raise RuntimeError("Parallel items should not be none when fn is callable.")
|
||||||
|
futures = _run_by_items(fn, parallel_items, *args, **kwargs)
|
||||||
|
elif isinstance(fn, list):
|
||||||
|
futures = _run_by_fn_list(fn, *args, **kwargs)
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Nothing to run. fn should be either callable or list of callables.")
|
||||||
|
|
||||||
|
# Check for exceptions
|
||||||
|
exceptions = [future.exception() for future in futures if future.exception()]
|
||||||
|
if exceptions:
|
||||||
|
message = "\n".join([str(e) for e in exceptions])
|
||||||
|
raise RuntimeError(f"The following exceptions occured during parallel run: {message}")
|
||||||
|
return futures
|
||||||
|
|
||||||
|
|
||||||
|
def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
|
||||||
|
if not len(fn_list):
|
||||||
|
return []
|
||||||
|
if not all([callable(f) for f in fn_list]):
|
||||||
|
raise RuntimeError("fn_list should contain only callables")
|
||||||
|
|
||||||
|
futures: list[Future] = []
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=len(fn_list)) as executor:
|
||||||
|
for fn in fn_list:
|
||||||
|
task_args = _get_args(*args)
|
||||||
|
task_kwargs = _get_kwargs(**kwargs)
|
||||||
|
|
||||||
|
futures.append(executor.submit(fn, *task_args, **task_kwargs))
|
||||||
|
|
||||||
|
return futures
|
||||||
|
|
||||||
|
|
||||||
|
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
|
||||||
|
futures: list[Future] = []
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=len(parallel_items)) as executor:
|
||||||
|
for item in parallel_items:
|
||||||
|
task_args = _get_args(*args)
|
||||||
|
task_kwargs = _get_kwargs(**kwargs)
|
||||||
|
task_args.insert(0, item)
|
||||||
|
|
||||||
|
futures.append(executor.submit(fn, *task_args, **task_kwargs))
|
||||||
|
|
||||||
|
return futures
|
||||||
|
|
||||||
|
|
||||||
|
def _get_kwargs(**kwargs):
|
||||||
|
actkwargs = {}
|
||||||
|
for key, arg in kwargs.items():
|
||||||
|
if isinstance(arg, itertools.cycle):
|
||||||
|
actkwargs[key] = next(arg)
|
||||||
|
else:
|
||||||
|
actkwargs[key] = arg
|
||||||
|
return actkwargs
|
||||||
|
|
||||||
|
|
||||||
|
def _get_args(*args):
|
||||||
|
actargs = []
|
||||||
|
for arg in args:
|
||||||
|
if isinstance(arg, itertools.cycle):
|
||||||
|
actargs.append(next(arg))
|
||||||
|
else:
|
||||||
|
actargs.append(arg)
|
||||||
|
return actargs
|
|
@ -3,3 +3,4 @@ import frostfs_testlib.utils.datetime_utils
|
||||||
import frostfs_testlib.utils.json_utils
|
import frostfs_testlib.utils.json_utils
|
||||||
import frostfs_testlib.utils.string_utils
|
import frostfs_testlib.utils.string_utils
|
||||||
import frostfs_testlib.utils.wallet_utils
|
import frostfs_testlib.utils.wallet_utils
|
||||||
|
from frostfs_testlib.utils.file_keeper import FileKeeper
|
||||||
|
|
Loading…
Reference in a new issue