Concurrent Bag
- class thread_factory.concurrency.concurrent_bag.ConcurrentBag(initial: List[_T] | None = None)[source]
Bases:
Generic[_T],IDisposableA thread-safe multiset (“bag”) implementation using: - a dict from item -> integer count - an RLock for synchronization
Items can appear multiple times, unlike a standard set. This class is designed for Python 3.13+ No-GIL environments (though it will work fine in standard Python as well).
- add(item: _T) None[source]
Add one occurrence of item to the bag.
- Parameters:
item (_T) – The item to add.
- batch_update(func: Callable[[Dict[_T, int]], None]) None[source]
Perform a batch update on the bag under a single lock acquisition. This method allows multiple operations to be performed atomically.
- Parameters:
func (Callable[[Dict[_T, int]], None]) – A function that accepts the internal dictionary (item->count) as its only argument. The function should perform all necessary mutations.
- copy() ConcurrentBag[_T][source]
Return a shallow copy of the ConcurrentBag.
- Returns:
A new ConcurrentBag with the same items and counts.
- Return type:
ConcurrentBag[_T]
- count_of(item: _T) int[source]
Return how many times item appears in the bag.
- Parameters:
item (_T) – The item to count.
- Returns:
The number of occurrences of this item.
- Return type:
int
- discard(item: _T) None[source]
Remove one occurrence of item from the bag if present, but do nothing if the item is not in the bag.
- dispose() None[source]
Disposes of this ConcurrentBag, releasing all internal resources.
- Responsibilities:
Clears the internal bag, removing all items.
Sets the disposed flag to True, marking this object as no longer valid.
Emits a warning to notify that the object has been disposed.
- Behavior:
This method is idempotent: subsequent calls have no effect after the first.
No automatic usage checks are enforced after disposal; it is the user’s responsibility to avoid further operations.
Notes
Designed for consistency with deterministic resource management patterns seen in systems programming (e.g., RAII, IDisposable).
Disposal does NOT release the lock itself since locks are acquired per operation.
Example
- with ConcurrentBag(…) as bag:
bag.add(42)
# bag is now automatically disposed and cleared
- filter(predicate: Callable[[_T], bool]) ConcurrentBag[_T][source]
Keep only items for which predicate(item) is True. Return a new bag.
- Parameters:
predicate (Callable[[_T], bool]) – A function returning True if an item should be kept, False otherwise.
- Returns:
A new bag containing only the items that passed the filter.
- Return type:
ConcurrentBag[_T]
- map(func: Callable[[_T], _T]) ConcurrentBag[_T][source]
Apply a function to each item and return a new ConcurrentBag with the transformed items. Note that if func(x) == func(y) for multiple items, the new bag merges them.
- Parameters:
func (Callable[[_T], _T]) – A function to apply to each item.
- Returns:
A new bag with the transformed items.
- Return type:
ConcurrentBag[_T]
- pop() _T[source]
Remove and return a single occurrence of an arbitrary item from the bag.
- Raises:
KeyError – If the bag is empty.
- Returns:
An item that was removed.
- Return type:
_T
- reduce(func: Callable[[Any, _T], Any], initial: Any | None = None) Any[source]
Apply a function of two arguments cumulatively to the items in the bag, passing items as many times as their counts.
- Parameters:
func (Callable[[Any, _T], Any]) – A function taking (accumulator, item).
initial (Any, optional) – A starting value for the accumulator.
- Returns:
The reduced value.
- Return type:
Any
- Raises:
TypeError – If the bag is empty and no initial value is provided.
Example
# Summation of all numeric items in the bag: def add(acc, x):
return acc + x
total = concurrent_bag.reduce(add, 0)
- remove(item: _T) None[source]
Remove one occurrence of item from the bag.
- Raises:
KeyError – If the item is not present in the bag.
- to_concurrent_dict() ConcurrentDict[_T, int][source]
Return a shallow copy of the internal dictionary (item -> count), wrapped in a ConcurrentDict.
- Returns:
A concurrent dictionary of items to counts.
- Return type:
ConcurrentDict[_T, int]
- unique_items() List[_T][source]
Return a list of distinct items present in the bag (each item only once).
- Returns:
A snapshot of all unique items.
- Return type:
List[_T]
- update(other: ConcurrentBag[_T]) None[source]
Update this bag with items from another bag, adding their counts.
- Parameters:
other (ConcurrentBag[_T]) – Another bag to merge into this one.