frostfs-testlib/src/frostfs_testlib/testing/parallel.py
a.berezin 95b32a036a
Some checks failed
DCO action / DCO (pull_request) Has been cancelled
[#316] Extend parallel exception message output
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-12 12:28:10 +03:00

148 lines
5.7 KiB
Python

import itertools
import traceback
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import contextmanager
from typing import Callable, Collection, Optional, Union
MAX_WORKERS = 50
@contextmanager
def parallel_workers_limit(workers_count: int):
global MAX_WORKERS
original_value = MAX_WORKERS
MAX_WORKERS = workers_count
try:
yield
finally:
MAX_WORKERS = original_value
def parallel(
fn: Union[Callable, list[Callable]],
parallel_items: Optional[Collection] = None,
*args,
**kwargs,
) -> list[Future]:
"""Parallel execution of selected function or list of function using ThreadPoolExecutor.
Also checks the exceptions of each thread.
Args:
fn: function(s) to run. Can work in 2 modes:
1. If you have dedicated function with some items to process in parallel,
like you do with executor.map(fn, parallel_items), pass this function as fn.
2. If you need to process each item with it's own method, like you do
with executor.submit(fn, args, kwargs), pass list of methods here.
See examples in runners.py in this repo.
parallel_items: items to iterate on (should be None in case of 2nd mode).
args: any other args required in target function(s).
if any arg is itertool.cycle, it will be iterated before passing to new thread.
kwargs: any other kwargs required in target function(s)
if any kwarg is itertool.cycle, it will be iterated before passing to new thread.
Returns:
list of futures.
"""
if callable(fn):
if not parallel_items:
raise RuntimeError("Parallel items should not be none when fn is callable.")
futures = _run_by_items(fn, parallel_items, *args, **kwargs)
elif isinstance(fn, list):
futures = _run_by_fn_list(fn, *args, **kwargs)
else:
raise RuntimeError("Nothing to run. fn should be either callable or list of callables.")
# Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()]
if exceptions:
# Prettify exception in parallel with all underlying stack traces
# For example, we had 3 RuntimeError exceptions during parallel. This format will give us something like
#
# RuntimeError: The following exceptions occured during parallel run:
# 1) Exception one text
# 2) Exception two text
# 3) Exception three text
# TRACES:
# ==== 1 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception one text")
# RuntimeError: Exception one text
#
# ==== 2 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception two text")
# RuntimeError: Exception two text
#
# ==== 3 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception three text")
# RuntimeError: Exception three text
short_summary = "\n".join([f"{i}) {str(e)}" for i, e in enumerate(exceptions, 1)])
stack_traces = "\n".join(
[f"==== {i} ====\n{''.join(traceback.TracebackException.from_exception(e).format())}" for i, e in enumerate(exceptions, 1)]
)
message = f"{short_summary}\nTRACES:\n{stack_traces}"
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
return futures
def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
if not len(fn_list):
return []
if not all([callable(f) for f in fn_list]):
raise RuntimeError("fn_list should contain only callables")
futures: list[Future] = []
with ThreadPoolExecutor(max_workers=min(len(fn_list), MAX_WORKERS)) as executor:
for fn in fn_list:
task_args = _get_args(*args)
task_kwargs = _get_kwargs(**kwargs)
futures.append(executor.submit(fn, *task_args, **task_kwargs))
return futures
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
futures: list[Future] = []
with ThreadPoolExecutor(max_workers=min(len(parallel_items), MAX_WORKERS)) as executor:
for item in parallel_items:
task_args = _get_args(*args)
task_kwargs = _get_kwargs(**kwargs)
task_args.insert(0, item)
futures.append(executor.submit(fn, *task_args, **task_kwargs))
return futures
def _get_kwargs(**kwargs):
actkwargs = {}
for key, arg in kwargs.items():
if isinstance(arg, itertools.cycle):
actkwargs[key] = next(arg)
else:
actkwargs[key] = arg
return actkwargs
def _get_args(*args):
actargs = []
for arg in args:
if isinstance(arg, itertools.cycle):
actargs.append(next(arg))
else:
actargs.append(arg)
return actargs