Concurrent Buffer
- class thread_factory.concurrency.concurrent_buffer.ConcurrentBuffer(number_of_shards: int = 4, initial: Iterable[_T] | None = None)[source]
Bases:
Generic[_T],IDisposableA thread-safe, mostly FIFO buffer implementation using multiple internal deques (shards). Items are tagged with a timestamp upon enqueue.
This buffer aims to provide better concurrency than a single-lock queue in low to moderate contention scenarios by distributing items across multiple internal shards, each with its own lock.
This concurrency object does NOT guarantee strict FIFO ordering across shards. It generally performs well in moderate concurrency scenarios. For extreme concurrency or high contention, consider other structures.
NOTE: This buffer is not designed for high-contention scenarios. ConcurrentQueue or ConcurrentStack outperform this object in heavy contention. DO NOT EXCEED 20 THREADS OVERALL (for producer and consumer pattern) WHEN USING THIS OBJECT.
The rule of thumb is to use half as many shards as total runtime (producer + consumer). e.g., 10 runtime => 5 shards.
This class now implements a Disposable pattern, allowing you to dispose of it explicitly or via a with statement when it’s no longer needed.
- batch_update(func: Callable[[List[_T]], None]) None[source]
Applies a function to all items in the buffer as a batch, then clears and re-enqueues the updated items.
- Parameters:
func (Callable[[List[_T]], None]) – A function that takes a list of items and modifies it in place.
- copy() ConcurrentBuffer[_T][source]
Creates a shallow copy of the ConcurrentBuffer.
- Returns:
A new ConcurrentBuffer with the same items.
- Return type:
ConcurrentBuffer[_T]
- dequeue() _T[source]
Removes and returns the oldest item from the buffer based on the timestamp at the head of each shard.
- Raises:
Empty – If the buffer is empty.
- Returns:
The oldest item in the buffer.
- Return type:
_T
- dispose() None[source]
Disposes of this ConcurrentBuffer, releasing all internal resources.
- Responsibilities:
Disposes each internal shard by clearing their queues and resetting their counters.
Resets the shared _length_array and _time_array used for shard coordination.
Marks this object as disposed (self.disposed = True).
- Behavior:
This method is idempotent: multiple calls will have no adverse effects after the first.
Once disposed, the buffer should be considered permanently invalid.
No post-disposal protection is enforced — correct usage is left to the caller’s responsibility.
Notes
A warning is emitted when disposal occurs to signal that the buffer has been destroyed.
This follows the explicit resource control philosophy common in high-performance and systems programming.
Example
- with ConcurrentBuffer(…) as buf:
…
# buffer is automatically disposed here
- enqueue(item: _T) None[source]
Adds a new item to the buffer.
- Parameters:
item (_T) – The item to add
- filter(func: Callable[[_T], bool]) ConcurrentBuffer[_T][source]
Filters the items in the buffer based on a given predicate and returns a new ConcurrentBuffer with the filtered items.
- Parameters:
func (Callable[[_T], bool]) – The predicate function to filter items.
- Returns:
A new ConcurrentBuffer with the filtered items.
- Return type:
ConcurrentBuffer[_T]
- map(func: Callable[[_T], Any]) ConcurrentBuffer[Any][source]
Applies a function to each item in the buffer and returns a new ConcurrentBuffer with the transformed items.
- Parameters:
func (Callable[[_T], Any]) – The function to apply to each item.
- Returns:
A new ConcurrentBuffer with the mapped items.
- Return type:
ConcurrentBuffer[Any]
- peek(index: int | None = None) _T[source]
Returns the oldest item from the buffer without removing it.
If an index is provided, this peeks into the specific shard. Otherwise, it finds the shard with the overall oldest item.
- Parameters:
index (Optional[int], optional) – The index of the shard to peek into. If None, returns the oldest item across all shards.
- Raises:
Empty – If the buffer is empty or the specified shard is empty.
- Returns:
The oldest item in the buffer (copied).
- Return type:
_T
- peek_oldest() _T[source]
Returns the oldest item across all shards without removing it.
- Raises:
Empty – If the buffer is empty.
- Returns:
The oldest item in the buffer (copied).
- Return type:
_T
- reduce(func: Callable[[Any, _T], Any], initial: Any | None = None) Any[source]
Applies a function of two arguments cumulatively to the items of the buffer, from left to right, to reduce the buffer to a single value.
- Parameters:
func (Callable[[Any, _T], Any]) – The function to apply, taking the accumulator and the current item.
initial (Optional[Any], optional) – The initial value for the accumulator. Defaults to None.
- Raises:
TypeError – If the buffer is empty and no initial value is provided.
- Returns:
The reduced value.
- Return type:
Any
- remove_item(item: _T) bool[source]
Removes the first occurrence of a specific item from the buffer.
- Parameters:
item (_T) – The item to remove.
- Returns:
True if the item was found and removed, False otherwise.
- Return type:
bool
- to_concurrent_list() ConcurrentList[_T][source]
Converts the buffer’s contents to a ConcurrentList.
- Returns:
A new ConcurrentList containing the same items.
- Return type:
ConcurrentList[_T]