Concurrent Bag

class thread_factory.concurrency.concurrent_bag.ConcurrentBag(initial: List[_T] | None = None)[source]

Bases: Generic[_T], IDisposable

A thread-safe multiset (“bag”) implementation using: - a dict from item -> integer count - an RLock for synchronization

Items can appear multiple times, unlike a standard set. This class is designed for Python 3.13+ No-GIL environments (though it will work fine in standard Python as well).

add(item: _T) None[source]

Add one occurrence of item to the bag.

Parameters:

item (_T) – The item to add.

batch_update(func: Callable[[Dict[_T, int]], None]) None[source]

Perform a batch update on the bag under a single lock acquisition. This method allows multiple operations to be performed atomically.

Parameters:

func (Callable[[Dict[_T, int]], None]) – A function that accepts the internal dictionary (item->count) as its only argument. The function should perform all necessary mutations.

clear() None[source]

Remove all items from the bag.

copy() ConcurrentBag[_T][source]

Return a shallow copy of the ConcurrentBag.

Returns:

A new ConcurrentBag with the same items and counts.

Return type:

ConcurrentBag[_T]

count_of(item: _T) int[source]

Return how many times item appears in the bag.

Parameters:

item (_T) – The item to count.

Returns:

The number of occurrences of this item.

Return type:

int

discard(item: _T) None[source]

Remove one occurrence of item from the bag if present, but do nothing if the item is not in the bag.

dispose() None[source]

Disposes of this ConcurrentBag, releasing all internal resources.

Responsibilities:
  • Clears the internal bag, removing all items.

  • Sets the disposed flag to True, marking this object as no longer valid.

  • Emits a warning to notify that the object has been disposed.

Behavior:
  • This method is idempotent: subsequent calls have no effect after the first.

  • No automatic usage checks are enforced after disposal; it is the user’s responsibility to avoid further operations.

Notes

  • Designed for consistency with deterministic resource management patterns seen in systems programming (e.g., RAII, IDisposable).

  • Disposal does NOT release the lock itself since locks are acquired per operation.

Example

with ConcurrentBag(…) as bag:

bag.add(42)

# bag is now automatically disposed and cleared

filter(predicate: Callable[[_T], bool]) ConcurrentBag[_T][source]

Keep only items for which predicate(item) is True. Return a new bag.

Parameters:

predicate (Callable[[_T], bool]) – A function returning True if an item should be kept, False otherwise.

Returns:

A new bag containing only the items that passed the filter.

Return type:

ConcurrentBag[_T]

map(func: Callable[[_T], _T]) ConcurrentBag[_T][source]

Apply a function to each item and return a new ConcurrentBag with the transformed items. Note that if func(x) == func(y) for multiple items, the new bag merges them.

Parameters:

func (Callable[[_T], _T]) – A function to apply to each item.

Returns:

A new bag with the transformed items.

Return type:

ConcurrentBag[_T]

pop() _T[source]

Remove and return a single occurrence of an arbitrary item from the bag.

Raises:

KeyError – If the bag is empty.

Returns:

An item that was removed.

Return type:

_T

reduce(func: Callable[[Any, _T], Any], initial: Any | None = None) Any[source]

Apply a function of two arguments cumulatively to the items in the bag, passing items as many times as their counts.

Parameters:
  • func (Callable[[Any, _T], Any]) – A function taking (accumulator, item).

  • initial (Any, optional) – A starting value for the accumulator.

Returns:

The reduced value.

Return type:

Any

Raises:

TypeError – If the bag is empty and no initial value is provided.

Example

# Summation of all numeric items in the bag: def add(acc, x):

return acc + x

total = concurrent_bag.reduce(add, 0)

remove(item: _T) None[source]

Remove one occurrence of item from the bag.

Raises:

KeyError – If the item is not present in the bag.

to_concurrent_dict() ConcurrentDict[_T, int][source]

Return a shallow copy of the internal dictionary (item -> count), wrapped in a ConcurrentDict.

Returns:

A concurrent dictionary of items to counts.

Return type:

ConcurrentDict[_T, int]

unique_items() List[_T][source]

Return a list of distinct items present in the bag (each item only once).

Returns:

A snapshot of all unique items.

Return type:

List[_T]

update(other: ConcurrentBag[_T]) None[source]

Update this bag with items from another bag, adding their counts.

Parameters:

other (ConcurrentBag[_T]) – Another bag to merge into this one.