Concurrent Tools
- class thread_factory.utilities.concurrent_tools.concurrent_tools.ConcurrentTools[source]
Bases:
object- A Python class that mimics .NET’s Task Parallel Library (TPL)-style operations:
for_loop
for_each
invoke
map
- Optional features include:
Local state management for parallel_for (via local_init/local_finalize)
Streaming mode in parallel_foreach to avoid loading the entire iterable into memory
stop_on_exception to cancel remaining chunks if an exception occurs in one chunk
Explicit default for max_workers using os.cpu_count()
chunk_size logic that tries to create roughly 4 chunks per executor by default
- NOTICE:
This class accepts user-defined functions and runs them concurrently. It does not enforce thread safety. If your functions modify shared state or access shared resources, you are responsible for implementing your own thread-safety mechanisms (e.g., locks, thread-local storage, or other synchronization primitives).
- static for_each(iterable: Iterable[_T], action: Callable[[_T], None], *, max_workers: int | None = None, chunk_size: int | None = None, stop_on_exception: bool = False, streaming: bool = False) None[source]
Execute the given action for each item in the iterable in parallel.
- Parameters:
iterable – The data to process. Can be any iterable.
action – The function to apply to each item.
max_workers – Maximum number of threads. Defaults to CPU count.
chunk_size – Number of items per chunk. Defaults to ~1/4 of total items per thread (unless streaming).
stop_on_exception – If True, once an exception occurs on one chunk, remaining chunks are canceled.
streaming – If True, process the iterable in a streaming manner (avoid loading the entire list into memory).
- static for_loop(start: int, stop: int, body: Callable[[int], None], *, max_workers: int | None = None, chunk_size: int | None = None, stop_on_exception: bool = False, local_init: Callable[[], Any] | None = None, local_body: Callable[[int, Any], None] | None = None, local_finalize: Callable[[Any], None] | None = None) None[source]
Execute the given ‘body’ (or ‘local_body’) for each integer in [start, stop) in parallel, optionally with local state initialization/finalization.
- Parameters:
start – The first integer index (inclusive).
stop – The stopping integer index (exclusive).
body – The function to apply to each integer (if no local state is used).
max_workers – The maximum number of threads to use. Defaults to CPU count.
chunk_size – Size of each chunk of indices to process. Defaults to ~1/4 of total work per thread.
stop_on_exception – If True, once an exception occurs in one chunk, remaining chunks will be canceled.
local_init – Optional function that initializes a local state object for each thread.
local_body – Optional function that uses the local state object. Called for each index if local state is used.
local_finalize – Optional function to finalize the local state for each thread after all items in the chunk are processed.
- static invoke(*functions: Callable[[], Any], wait: bool = True, max_workers: int | None = None) List[Future][source]
Execute multiple functions in parallel. Optionally wait for all functions to complete before returning.
- Parameters:
functions – Any number of callable objects (no arguments).
wait – If True (default), block until all functions finish.
max_workers – The maximum number of threads to use. Defaults to CPU count.
- Returns:
A list of Future objects. If wait=True, the futures will already be complete.
- static map(iterable: Iterable[_T], transform: Callable[[_T], _R], *, max_workers: int | None = None, chunk_size: int | None = None) List[_R][source]
Transform each element of ‘iterable’ in parallel and return the results in the original order. Similar to built-in map, but parallelized.
- Parameters:
iterable – The data to process in parallel.
transform – The function that transforms each item.
max_workers – The maximum number of threads to use. Defaults to CPU count.
chunk_size – Chunk size (defaults to ~1/4 of the total items per thread).
- Returns:
A list of transformed items in the same order as the original iterable.