Concurrent Collection
- class thread_factory.concurrency.concurrent_collection.ConcurrentCollection(total_thread_count: int = 1, initial: Iterable[_T] | None = None)[source]
Bases:
Generic[_T],IDisposableA thread-safe, high-level collection that distributes items across multiple internal shards (lock-protected deques). Each shard is independently locked, so multiple runtime can access different shards in parallel with minimal contention.
Overall ordering across shards is not guaranteed to be strictly FIFO — each shard behaves like a small FIFO queue, but the global order is only approximate.
- Recommended Usage:
Ideal for up to ~20 total runtime (producers + consumers).
For each pair of runtime (producer/consumer), consider 1 shard as a rough guideline.
If heavy contention or extremely high concurrency is expected, consider ConcurrentQueue or ConcurrentStack instead.
- Shard Count:
Must be at least 1.
If > 1, must be even. (This helps with symmetrical partitioning in some logic.)
The actual number of shards used is set in the constructor, defaulting to the total_thread_count parameter if provided.
- Disposal:
Implemented via dispose(), which disposes all shards and clears shared data.
Refrain from using the collection once disposed.
with ConcurrentCollection(…) as cc: usage automatically calls dispose() on exit.
- add(item: _T) None[source]
Adds (pushes) an item to the collection.
This uses _select_shard() to pick which shard to place the item into, distributing load. Each shard remains FIFO internally.
- Parameters:
item (_T) – The item to add.
- batch_update(func: Callable[[List[_T]], None]) None[source]
Performs a ‘batch update’ by collecting all items, passing them to the provided function, clearing the shards, then re-inserting the updated items.
- Parameters:
func (Callable[[List[_T]], None]) – A function that accepts a list of items and mutates them in-place.
- copy() ConcurrentCollection[_T][source]
Creates a shallow copy of the collection, collecting all items into a new ConcurrentCollection with the same shard count.
- Returns:
The new collection containing copies of the current items.
- Return type:
- dispose() None[source]
Disposes of this ConcurrentCollection and releases its resources.
- Responsibilities:
Disposes all internal shards, which will clear their internal queues and reset their length counters.
Resets the internal _length_array to zeroed values.
Marks the collection as disposed via the self.disposed flag.
- Behavior:
This method is idempotent. Calling dispose() multiple times is safe and will have no effect after the first call.
Once disposed, the collection is considered invalid and should not be used further.
This follows a typical deterministic disposal pattern (inspired by .NET’s IDisposable), ensuring explicit control over resource lifetime.
Notes
Unlike some patterns, this implementation does NOT prevent method calls after disposal. It is the user’s responsibility to ensure that no further use is made of the object after it is disposed.
- filter(func: Callable[[_T], bool]) ConcurrentCollection[_T][source]
Constructs a new collection containing only items for which func(item) is True.
- Parameters:
func (Callable[[_T], bool]) – The predicate to apply.
- Returns:
A new collection with the filtered items.
- Return type:
- map(func: Callable[[_T], Any]) ConcurrentCollection[Any][source]
Constructs a new collection by applying func to each item in the current collection.
- Parameters:
func (Callable[[_T], Any]) – The transformation function.
- Returns:
A new collection with mapped items.
- Return type:
ConcurrentCollection[Any]
- peek(shard_index: int | None = None) _T[source]
Returns (but does not remove) an item from the collection. If shard_index is given, attempts to peek specifically at that shard. Otherwise, scans shards in order, returning from the first non-empty shard.
- Parameters:
shard_index (Optional[int], optional) – The index of the shard to peek. If None, find the first non-empty shard.
- Raises:
IndexError – If shard_index is out of range.
Empty – If the chosen shard (or all shards) is empty.
- Returns:
An item from the front of a shard.
- Return type:
_T
- pop() _T[source]
Removes and returns an item from the collection (like dequeue). It attempts a short scan across shards to find any non-empty one.
- Raises:
Empty – If all shards are empty.
- Returns:
The item from one of the shards.
- Return type:
_T
- reduce(func: Callable[[Any, _T], Any], initial: Any | None = None) Any[source]
Reduces the collection into a single value by iterating over all items in shard order and applying func(acc, item) cumulatively.
- Parameters:
func (Callable[[Any, _T], Any]) – A two-argument function with signature func(accumulator, item).
initial (Optional[Any], optional) – An initial accumulator value. If None and the collection is empty, a TypeError is raised (mirroring the built-in reduce behavior).
- Returns:
The final accumulated result.
- Return type:
Any
- remove_item(item: _T) bool[source]
Removes the first occurrence of item from the collection (by identity check).
This is an O(n) operation since it reconstructs the entire collection.
- Parameters:
item (_T) – The item to remove.
- Returns:
True if the item was found and removed, False otherwise.
- Return type:
bool
- to_concurrent_list() ConcurrentList[_T][source]
Converts the collection’s contents into a ConcurrentList.
- Returns:
A new concurrent list containing all items.
- Return type:
ConcurrentList[_T]