Concurrent Set

class thread_factory.concurrency.concurrent_set.ConcurrentSet(initial: Iterable[_T] | None = None)[source]

Bases: Generic[_T], IDisposable

Thread‑safe, optionally freezeable hash‑set implementation.

This class provides a concurrent-safe wrapper around Python’s built-in set. It uses a reentrant lock (threading.RLock) to protect the underlying set during mutations, ensuring that multiple threads can safely interact with the set without data corruption.

Key Design Points:

  • RLock + freeze flag: The core of thread safety relies on a threading.RLock. However, to optimize for read-heavy workloads, a _freeze flag is introduced. When the set is frozen via the freeze() method, read operations (like checking membership or iteration) can proceed without acquiring the lock, significantly improving performance under high read concurrency. Mutating operations (add, remove, etc.) are blocked when frozen, raising a TypeError to indicate an incorrect state usage early in the development process.

  • Rich set algebra: Standard set operations, both those returning new sets (|, &, -, ^) and those modifying the set in-place (|=, &=, -=, ^=), are implemented or forwarded to internal helpers. These implementations ensure that the operations are performed safely under the lock and respect the frozen state.

  • Disposable / context‑manager: The class implements the IDisposable interface, following a pattern where resources (in this case, the internal set’s data) can be explicitly cleaned up using the dispose() method. It also supports the context manager protocol (with ConcurrentSet(…) as cs:), although using the context manager is strongly discouraged for normal operations as it exposes the raw internal set, bypassing the thread-safe interface. Its primary intended use is for advanced scenarios or explicit resource management patterns.

add(item: _T) None[source]

Add item to the set.

If the item is already present, this method has no effect. This is consistent with the behavior of Python’s built-in set.add().

This operation is thread-safe as it first checks the freeze state and then acquires the internal lock before performing the addition.

batch_update(func: Callable[[Set[_T]], None])[source]

Atomically perform many mutations in one lock acquisition.

This method provides a way to execute a series of modifications to the underlying set while holding the lock for the entire duration of the modification process. This is more efficient than acquiring and releasing the lock for each individual operation within the batch, especially for bulk changes.

The provided callable func takes the raw internal set (self._set) as its only argument. You can then perform multiple operations directly on this set within the func.

IMPORTANT: Because func receives the raw internal set, you are responsible for ensuring that the operations performed inside func maintain set invariants and are safe in the context of being executed while the lock is held. Avoid lengthy or blocking operations inside func.

Parameters:

func – A callable that takes one argument, which is the raw internal built-in set, and performs mutations on it. The function should not return a value (or return None).

clear() None[source]

Remove all elements from the set.

After calling this method, the set will be empty.

This operation is thread-safe as it first checks the freeze state and then acquires the internal lock before clearing the set.

difference(*others: Iterable[_T]) ConcurrentSet[_T][source]

Return the difference (-) between self and all provided iterables.

This method calculates the difference between the current set and one or more other iterables. It contains elements in this set that are NOT in any of the other iterables. It does not modify the original set.

Parameters:

*others – One or more iterables containing elements to remove from this set.

Returns:

A new ConcurrentSet containing elements that are present in this set but NOT in any of the others iterables.

discard(item: _T) None[source]

Remove item from the set if it is present.

If the item is not present, this method does nothing and does not raise an error (unlike remove()). This behavior matches Python’s built-in set.discard().

This operation is thread-safe as it first checks the freeze state and then acquires the internal lock before attempting the discard.

dispose() None[source]

Clear internal data and mark the ConcurrentSet as disposed.

This method releases the resources held by the set, primarily by clearing the underlying built-in set. Once disposed, the set should not be used further.

This method is idempotent; calling it multiple times has no additional effect after the first call. It is also thread-safe, using the internal lock to protect the clearing operation and the disposed flag update.

filter(func: Callable[[_T], bool]) ConcurrentSet[_T][source]

Filter elements based on a predicate function and return a new ConcurrentSet.

This method iterates over a snapshot of the set, applies the provided predicate function func to each element, and includes only those elements for which func returns True in a new set. The original set is not modified.

Parameters:

func – A callable that takes one argument (an element from the set) and returns a boolean (True to keep the element, False to discard).

Returns:

A new ConcurrentSet containing only the elements from the original set for which func returned True. The order of elements in the resulting set is arbitrary.

freeze() None[source]

Freeze the set.

Once frozen, attempts to call mutating methods (add, remove, update, in-place algebra operators like |=, etc.) will result in a TypeError. This state indicates that the set is intended for read-only access.

The primary benefit of freezing is enabling lock-free reads. When the set is frozen, methods like __contains__, __len__, and __iter__ directly access the internal _set without acquiring the lock, as its contents are guaranteed not to change while frozen.

This method is thread-safe as it acquires the internal lock to update the _freeze flag atomically.

intersection(*others: Iterable[_T]) ConcurrentSet[_T][source]

Return the intersection (&) of all provided iterables and self.

This method calculates the intersection of the current set with one or more other iterables. It does not modify the original set.

Parameters:

*others – One or more iterables containing elements to intersect with.

Returns:

A new ConcurrentSet containing elements that are present in this set AND in all of the others iterables.

property is_frozen: bool

Return True if the set is currently frozen (read-only mode).

This property provides a way to check the current state of the freeze flag. It does not acquire the internal lock because reading a boolean flag like _freeze is an atomic operation in Python, and its value represents a state that is only changed while the lock is held in freeze() and unfreeze(). Thus, checking the flag itself is safe without a lock.

map(func: Callable[[_T], Any]) ConcurrentSet[Any][source]

Apply a function to each element in the set and return a new ConcurrentSet.

This method iterates over a snapshot of the set, applies the provided function func to each element, and collects the results into a new set. The original set is not modified.

Parameters:

func – A callable that takes one argument (an element from the set) and returns a value.

Returns:

A new ConcurrentSet containing the results of applying func to each element of the original set. The order of elements in the resulting set is arbitrary.

reduce(func: Callable[[Any, _T], Any], initial: Any | None = None) Any[source]

Apply a function cumulatively to the elements of the set.

This method applies the func callable of two arguments (accumulator, element) sequentially to the elements of the set, from left to right, so as to reduce the set to a single value. The first argument to func is the accumulated value, and the second is the current element from the set.

Parameters:
  • func – A callable that takes two arguments (accumulator, current_element) and returns a new accumulated value.

  • initial – An optional initial value for the accumulator. If initial is not provided, the first element of the set is used as the initial value, and the reduction starts from the second element.

Returns:

The single accumulated value resulting from the reduction.

Raises:

TypeError – If the set is empty and no initial value is provided.

remove(item: _T) None[source]

Remove item from the set.

Raises KeyError if the item is not present in the set. This behavior matches Python’s built-in set.remove().

This operation is thread-safe as it first checks the freeze state and then acquires the internal lock before performing the removal.

symmetric_difference(other: Iterable[_T]) ConcurrentSet[_T][source]

Return the symmetric difference (^) with other.

This method calculates the symmetric difference between the current set and exactly one other iterable. It contains elements that are in either set, but not in both. It does not modify the original set.

Parameters:

other – The iterable containing elements for the symmetric difference.

Returns:

A new ConcurrentSet containing elements that are in either this set OR the other iterable, but not in both.

to_concurrent_list() ConcurrentList[_T][source]

Convert the ConcurrentSet to a ConcurrentList.

This method creates a new ConcurrentList containing the elements of this set. Note that sets are inherently unordered, so the order of elements in the resulting ConcurrentList is arbitrary and not guaranteed to be the same across different calls or executions, just like converting a built-in set to a list using list(my_set).

Requires the ConcurrentList class to be available (successfully imported).

Returns:

A new ConcurrentList instance containing the elements of this set.

Raises:

ImportError – If the ConcurrentList class could not be imported when this module was loaded.

to_set() Set[_T][source]

Return a shallow copy of the underlying set.

This method provides a way to get a snapshot of the set’s current contents as a standard Python set. The returned set is a separate object and is always safe to mutate without affecting the original ConcurrentSet.

The copy operation is performed safely by acquiring the lock if the set is not frozen, ensuring a consistent snapshot. If the set is frozen, accessing _set directly is safe.

Returns:

A shallow copy of the internal built-in set.

unfreeze() None[source]

Unfreeze the set, re-enabling mutations.

After calling unfreeze(), mutating methods can be called again. Read operations will revert to acquiring the lock (or creating a locked copy) to ensure thread safety, as the underlying set’s contents may now change due to concurrent mutations.

This method is thread-safe as it acquires the internal lock to update the _freeze flag atomically.

union(*others: Iterable[_T]) ConcurrentSet[_T][source]

Return the union (|) of all provided iterables and self.

This method calculates the union of the current set with one or more other iterables. It does not modify the original set.

Parameters:

*others – One or more iterables containing elements to include in the union.

Returns:

A new ConcurrentSet containing all unique elements from this set and all the others iterables.

update(other: Iterable[_T]) None[source]

In‑place union with other.

Adds all elements from the other iterable to this set. This is equivalent to the |= operator for sets.

This operation is thread-safe as it first checks the freeze state and then acquires the internal lock before performing the update.