Concurrent Stack
- class thread_factory.concurrency.concurrent_stack.ConcurrentStack(initial: Iterable[_T] | None = None)[source]
Bases:
Generic[_T],IDisposableA thread-safe LIFO stack implementation using an underlying deque, a reentrant lock for synchronization, and an atomic counter for fast retrieval of the number of items.
This class mimics common stack behaviors (push, pop, peek, etc.). It is designed for Python 3.13+ No-GIL environments (though it will work fine in standard Python as well).
- batch_update(func: Callable[[Deque[_T]], None]) None[source]
Perform a batch update on the stack under a single lock acquisition. This method allows multiple operations to be performed atomically.
- Parameters:
func (Callable[[Deque[_T]], None]) – A function that accepts the internal deque as its only argument. The function should perform all necessary mutations.
- copy() ConcurrentStack[_T][source]
Return a shallow copy of the ConcurrentStack.
- Returns:
A new ConcurrentStack with the same items.
- Return type:
ConcurrentStack[_T]
- dispose() None[source]
Dispose (clear) this ConcurrentStack, releasing its contents.
Once disposed, _disposed becomes True and the internal dict is cleared. No further usage checks are enforced, so the user must avoid calling other methods after disposal.
This method is idempotent — multiple calls won’t cause errors.
- filter(func: Callable[[_T], bool]) ConcurrentStack[_T][source]
Filter elements based on a function and return a new ConcurrentStack.
- Parameters:
func (callable) – The filter function returning True if item should be kept.
- Returns:
A new stack containing only elements where func(item) is True.
- Return type:
ConcurrentStack[_T]
- is_empty() bool[source]
Check if the stack is empty.
- Returns:
True if the stack is empty, False otherwise.
- Return type:
bool
- map(func: Callable[[_T], Any]) ConcurrentStack[Any][source]
Apply a function to all elements and return a new ConcurrentStack.
- Parameters:
func (callable) – The function to apply to each item.
- Returns:
A new stack with func applied to each element.
- Return type:
ConcurrentStack[Any]
- peek() _T[source]
Return (but do not remove) the item at the top of the stack.
- Raises:
IndexError – If the stack is empty.
- Returns:
The item at the top of the stack.
- Return type:
_T
- pop() _T[source]
Remove and return an item from the top of the stack.
- Raises:
IndexError – If the stack is empty.
- Returns:
The item popped.
- Return type:
_T
- push(item: _T) None[source]
Push an item onto the top of the stack (LIFO).
- Parameters:
item (_T) – The item to push.
- reduce(func: Callable[[Any, _T], Any], initial: Any | None = None) Any[source]
Apply a function of two arguments cumulatively to the items of the stack.
- Parameters:
func (Callable[[Any, _T], Any]) – Function of the form func(accumulator, item).
initial (optional) – Starting value.
- Returns:
The reduced value.
- Return type:
Any
- Raises:
TypeError – If the stack is empty and no initial value is provided.
Example
- def add(acc, x):
return acc + x
total = concurrent_stack.reduce(add, 0)
- remove_item(item: _T) bool[source]
Remove the first occurrence of the item by identity (memory reference).
- Parameters:
item (_T) – The item to remove.
- Returns:
True if the item was found and removed, False otherwise.
- Return type:
bool
- steal_batch(max_items: int = 4) ConcurrentList[_T][source]
Atomically steal up to max_items from the head of the stack.
This is used in work-stealing contexts where idle threads pull work from the front (FIFO) of another thread’s stack. Returned tasks are reversed to maintain correct execution order (LIFO).
- Parameters:
max_items (int) – Maximum number of items to steal.
- Returns:
The stolen items, ordered for FIFO execution.
- Return type:
ConcurrentList[_T]
- to_concurrent_list() ConcurrentList[_T][source]
Return a shallow copy of the stack as a ConcurrentList.
- Returns:
A concurrency list containing all items currently in the stack.
- Return type:
ConcurrentList[_T]