Concurrent Queue
- class thread_factory.concurrency.concurrent_queue.ConcurrentQueue(initial: Iterable[_T] | None = None)[source]
Bases:
Generic[_T],IDisposableA thread-safe FIFO queue implementation using an underlying deque, a reentrant lock for synchronization, and an atomic counter for fast retrieval of the number of items.
This class mimics common queue behaviors (enqueue, dequeue, peek, etc.). It is designed for Python 3.13+ No-GIL environments (though it will work fine in standard Python as well).
- batch_update(func: Callable[[Deque[_T]], None]) None[source]
Perform a batch update on the queue under a single lock acquisition. This method allows multiple operations to be performed atomically.
- Parameters:
func (Callable[[Deque[_T]], None]) – A function that accepts the internal deque as its only argument. The function should perform all necessary mutations.
- copy() ConcurrentQueue[_T][source]
Return a shallow copy of the ConcurrentQueue.
- Returns:
A new ConcurrentQueue with the same items.
- Return type:
ConcurrentQueue[_T]
- dequeue() _T[source]
Remove and return an item from the front of the queue.
- Raises:
IndexError – If the queue is empty.
- Returns:
The item dequeued.
- Return type:
_T
- dispose() None[source]
Dispose (clear) this ConcurrentQueue, releasing its contents.
Once disposed, _disposed becomes True and the internal dict is cleared. No further usage checks are enforced, so the user must avoid calling other methods after disposal.
This method is idempotent — multiple calls won’t cause errors.
- empty()[source]
Return True if the queue has no items.
- Returns:
True if the queue is empty, False otherwise.
- Return type:
bool
- enqueue(item: _T) None[source]
Add an item to the end of the queue (FIFO).
- Parameters:
item (_T) – The item to enqueue.
- filter(func: Callable[[_T], bool]) ConcurrentQueue[_T][source]
Filter elements based on a function and return a new ConcurrentQueue.
- Parameters:
func (callable) – The filter function returning True if item should be kept.
- Returns:
A new queue containing only elements where func(item) is True.
- Return type:
ConcurrentQueue[_T]
- is_empty() bool[source]
Return True if the queue has no items.
- Returns:
True if the queue is empty, False otherwise.
- Return type:
bool
- map(func: Callable[[_T], Any]) ConcurrentQueue[Any][source]
Apply a function to all elements and return a new ConcurrentQueue.
- Parameters:
func (callable) – The function to apply to each item.
- Returns:
A new queue with func applied to each element.
- Return type:
ConcurrentQueue[Any]
- peek() _T[source]
Return (but do not remove) the item at the front of the queue.
- Raises:
IndexError – If the queue is empty.
- Returns:
The item at the front of the queue.
- Return type:
_T
- reduce(func: Callable[[Any, _T], Any], initial: Any | None = None) Any[source]
Apply a function of two arguments cumulatively to the items of the queue.
- Parameters:
func (Callable[[Any, _T], Any]) – Function of the form func(accumulator, item).
initial (optional) – Starting value.
- Returns:
The reduced value.
- Return type:
Any
- Raises:
TypeError – If the queue is empty and no initial value is provided.
Example
- def add(acc, x):
return acc + x
total = concurrent_queue.reduce(add, 0)
- remove_item(item: _T) bool[source]
Remove the first occurrence of the item by identity (memory reference).
- Parameters:
item (_T) – The item to remove.
- Returns:
True if the item was found and removed, False otherwise.
- Return type:
bool
- steal_batch(max_items: int = 4) ConcurrentList[_T][source]
Atomically steal up to max_items from the tail of the queue.
This is used in work-stealing contexts where idle threads pull work from the end (LIFO) of another thread’s queue. Returned tasks are reversed to maintain correct execution order (FIFO).
- Parameters:
max_items (int) – Maximum number of items to steal.
- Returns:
The stolen items, ordered for FIFO execution.
- Return type:
ConcurrentList[_T]
- to_concurrent_list() ConcurrentList[_T][source]
Return a shallow copy of the queue as a ConcurrentList.
- Returns:
A concurrency list containing all items currently in the queue.
- Return type: