frostfs-testlib/src/frostfs_testlib/testing/parallel.py

101 lines
3.5 KiB
Python
Raw Normal View History

import itertools
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Callable, Collection, Optional, Union
MAX_WORKERS = 50
def parallel(
fn: Union[Callable, list[Callable]],
parallel_items: Optional[Collection] = None,
*args,
**kwargs,
) -> list[Future]:
"""Parallel execution of selected function or list of function using ThreadPoolExecutor.
Also checks the exceptions of each thread.
Args:
fn: function(s) to run. Can work in 2 modes:
1. If you have dedicated function with some items to process in parallel,
like you do with executor.map(fn, parallel_items), pass this function as fn.
2. If you need to process each item with it's own method, like you do
with executor.submit(fn, args, kwargs), pass list of methods here.
See examples in runners.py in this repo.
parallel_items: items to iterate on (should be None in case of 2nd mode).
args: any other args required in target function(s).
if any arg is itertool.cycle, it will be iterated before passing to new thread.
kwargs: any other kwargs required in target function(s)
if any kwarg is itertool.cycle, it will be iterated before passing to new thread.
Returns:
list of futures.
"""
if callable(fn):
if not parallel_items:
raise RuntimeError("Parallel items should not be none when fn is callable.")
futures = _run_by_items(fn, parallel_items, *args, **kwargs)
elif isinstance(fn, list):
futures = _run_by_fn_list(fn, *args, **kwargs)
else:
raise RuntimeError("Nothing to run. fn should be either callable or list of callables.")
# Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()]
if exceptions:
message = "\n".join([str(e) for e in exceptions])
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
return futures
def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
if not len(fn_list):
return []
if not all([callable(f) for f in fn_list]):
raise RuntimeError("fn_list should contain only callables")
futures: list[Future] = []
with ThreadPoolExecutor(max_workers=min(len(fn_list), MAX_WORKERS)) as executor:
for fn in fn_list:
task_args = _get_args(*args)
task_kwargs = _get_kwargs(**kwargs)
futures.append(executor.submit(fn, *task_args, **task_kwargs))
return futures
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
futures: list[Future] = []
with ThreadPoolExecutor(max_workers=min(len(parallel_items), MAX_WORKERS)) as executor:
for item in parallel_items:
task_args = _get_args(*args)
task_kwargs = _get_kwargs(**kwargs)
task_args.insert(0, item)
futures.append(executor.submit(fn, *task_args, **task_kwargs))
return futures
def _get_kwargs(**kwargs):
actkwargs = {}
for key, arg in kwargs.items():
if isinstance(arg, itertools.cycle):
actkwargs[key] = next(arg)
else:
actkwargs[key] = arg
return actkwargs
def _get_args(*args):
actargs = []
for arg in args:
if isinstance(arg, itertools.cycle):
actargs.append(next(arg))
else:
actargs.append(arg)
return actargs