import functools
import threading
import warnings
from copy import deepcopy
from typing import Any, Callable, Optional, List, TypeVar, Generic
from collections.abc import Iterable, Iterator
from thread_factory.utilities.interfaces.disposable import IDisposable
_T = TypeVar('_T')
[docs]
class ConcurrentList(Generic[_T], IDisposable):
"""
A thread-safe list implementation using an underlying Python list,
a reentrant lock for synchronization, and an atomic counter for fast,
lock-free retrieval of the length.
This class mimics many of the behaviors of a native Python list,
including slicing, in-place operators, and common utility methods.
It is designed for Python 3.13+ No-GIL environments.
"""
__slots__ = IDisposable.__slots__ + ["_lock", "_list", "_freeze"]
def __init__(self, initial: Optional[Iterable[_T]] = None) -> None:
"""
Initialize the ConcurrentList.
Args:
initial (Iterable[_T], optional): An iterable to initialize the list.
"""
super().__init__()
self._lock = threading.RLock()
self._list: List[_T] = list(initial) if initial else []
self._freeze = False
[docs]
def dispose(self) -> None:
"""
Dispose (clear) this ConcurrentList, releasing its contents.
Once disposed, `_disposed` becomes True and the internal dict is cleared.
No further usage checks are enforced, so the user must avoid calling
other methods after disposal.
This method is idempotent — multiple calls won't cause errors.
"""
if not self._disposed:
with self._lock:
self._list.clear()
self._disposed = True
warnings.warn(
"Your ConcurrentList has been disposed and should not be used further. ",
UserWarning
)
[docs]
def freeze(self) -> None:
"""
Freeze the dictionary to prevent further modifications.
This is useful for making the dictionary immutable after initialization.
"""
with self._lock:
self._freeze = True
@property
def is_frozen(self) -> bool:
"""
Check if the dictionary is frozen.
Returns:
bool: True if the dictionary is frozen, False otherwise.
"""
return self._freeze
[docs]
def unfreeze(self) -> None:
"""
Unfreeze the dictionary to allow modifications.
This is useful for making the dictionary mutable again after being frozen.
"""
with self._lock:
self._freeze = False
def __getitem__(self, index: int | slice) -> _T | List[_T]:
"""
Get an item or a slice from the list.
If ``index`` is an integer, this returns a single item.
If ``index`` is a slice, this returns a shallow copy of that slice.
Args:
index (int or slice): The index or slice.
Returns:
_T or List[_T]:
- A single item if index is an integer.
- A shallow copy of the slice if index is a slice.
Raises:
IndexError: If the index is out of range.
"""
if self._freeze:
if isinstance(index, int):
try:
return self._list[index]
except IndexError:
raise IndexError("ConcurrentList index out of range")
else: # slice
# Return a shallow copy of the slice
return self._list[index].copy()
else:
with self._lock:
if isinstance(index, int):
try:
return self._list[index]
except IndexError:
raise IndexError("ConcurrentList index out of range")
else: # slice
# Return a shallow copy of the slice
return self._list[index].copy()
def __setitem__(self, index: int | slice, value: _T | Iterable[_T]) -> None:
"""
Set an item or slice in the list.
If ``index`` is an integer, this sets a single item at that index.
If ``index`` is a slice, this replaces that slice with the contents of ``value``.
For slice assignment, adjusts the atomic counter appropriately.
Args:
index (int or slice): The index or slice.
value (_T or Iterable[_T]): The new value(s).
Raises:
IndexError: If the index is out of range.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
if isinstance(index, int):
try:
# If value is an iterable, type checkers might complain, so ignore for brevity
self._list[index] = value # type: ignore
except IndexError:
raise IndexError("ConcurrentList index out of range")
else:
old_slice = self._list[index]
# Ensure we're assigning a list if it's an iterable, or treat as single item
if isinstance(value, Iterable) and not isinstance(value, str):
value_list = list(value)
else:
# Fallback if someone calls slice assignment with a single non-iterable
# You could also just raise TypeError here if you prefer stricter behavior
value_list = [value] # type: ignore
self._list[index] = value_list
def __delitem__(self, index: int | slice) -> None:
"""
Delete an item or slice from the list.
If `index` is an integer, deletes the single element at that index.
If `index` is a slice, deletes all elements in that slice.
Args:
index (int or slice): The index or slice to delete.
Raises:
IndexError: If the index is out of range (for int index).
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
try:
del self._list[index]
except IndexError:
# Only raise a custom message for int index, slices behave differently
if isinstance(index, int):
raise IndexError("ConcurrentList index out of range")
else:
# Re-raise slice-related errors (could be ValueError or IndexError)
raise
[docs]
def append(self, item: _T) -> None:
"""
Append an item to the end of the list.
Args:
item (_T): The item to append.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
self._list.append(item)
[docs]
def extend(self, items: Iterable[_T]) -> None:
"""
Extend the list by appending elements from the iterable.
Args:
items (Iterable[_T]): An iterable of items to add.
Raises:
TypeError: If items is not iterable (e.g., if it's None).
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
for x in items:
self._list.append(x)
[docs]
def insert(self, index: int, item: _T) -> None:
"""
Insert an item at the specified index.
Args:
index (int): The index at which to insert.
item (_T): The item to insert.
Raises:
IndexError: If the index is out of range (depending on desired behavior).
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
# Python's list.insert clamps the index if out of range, but you can raise if you prefer
self._list.insert(index, item)
[docs]
def remove(self, item: _T) -> None:
"""
Remove the first occurrence of an item from the list.
Args:
item (_T): The item to remove.
Raises:
ValueError: If the item is not found.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
try:
self._list.remove(item)
except ValueError:
raise ValueError(f"'{item}' not in ConcurrentList")
[docs]
def pop(self, index: int = -1) -> _T:
"""
Remove and return the item at the given index (default is last).
Args:
index (int, optional): The index to pop. Defaults to -1.
Returns:
_T: The popped item.
Raises:
IndexError: If the list is empty or index is out of range.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
if not self._list:
raise IndexError("pop from empty ConcurrentList")
try:
return self._list.pop(index)
except IndexError:
raise IndexError("ConcurrentList index out of range for pop")
[docs]
def clear(self) -> None:
"""
Remove all items from the list.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
self._list.clear()
def __len__(self) -> int:
"""
Return the number of items in the list using the atomic counter.
Returns:
int: The current size of the list.
"""
if self._freeze:
return len(self._list)
else:
with self._lock:
return len(self._list)
def __iter__(self) -> Iterator[_T]:
"""
Return an iterator over a shallow copy of the list.
"""
if self._freeze:
return iter(self._list)
else:
with self._lock:
return iter(self._list.copy())
def __contains__(self, item: Any) -> bool:
"""
Check if an item is in the list.
Args:
item (Any): The item to check for.
Returns:
bool: True if the item is present, False otherwise.
"""
if self._freeze:
return item in self._list
else:
with self._lock:
return item in self._list
def __repr__(self) -> str:
"""
Return the official string representation of the ConcurrentList.
"""
if self._freeze:
return f"{self.__class__.__name__}({self._list!r})"
else:
with self._lock:
return f"{self.__class__.__name__}({self._list!r})"
def __str__(self) -> str:
"""
Return the informal string representation of the ConcurrentList.
"""
if self._freeze:
return str(self._list)
else:
with self._lock:
return str(self._list)
def __eq__(self, other: Any) -> bool:
"""
Check equality with another ConcurrentList or a standard list.
Args:
other (Any): The object to compare.
Returns:
bool: True if equal, False otherwise.
"""
if isinstance(other, ConcurrentList):
# Acquire locks in a defined order to avoid deadlocks
if id(self) < id(other):
first, second = self, other
else:
first, second = other, self
with first._lock, second._lock:
return self._list == other._list
elif isinstance(other, list):
with self._lock:
return self._list == other
return False
def __ne__(self, other: Any) -> bool:
"""
Check inequality with another ConcurrentList or a standard list.
"""
return not self.__eq__(other)
def __bool__(self) -> bool:
"""
Return True if the list is non-empty.
"""
return len(self._list) != 0
def __reversed__(self) -> Iterable[_T]:
"""
Return a reverse iterator over a copy of the list.
"""
if self._freeze:
return reversed(self._list)
else:
with self._lock:
return reversed(self._list.copy())
def __iadd__(self, other: Iterable[_T]) -> 'ConcurrentList[_T]':
"""
Implements in-place addition (+=).
Args:
other (Iterable[_T]): The iterable to extend with.
Returns:
ConcurrentList[_T]: self
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
self.extend(other)
return self
def __imul__(self, n: int) -> 'ConcurrentList[_T]':
"""
Implements in-place multiplication (*=).
Args:
n (int): The multiplier.
Returns:
ConcurrentList[_T]: self
Raises:
TypeError: If n is not an integer.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
if not isinstance(n, int):
raise TypeError("can't multiply sequence by non-int of type '{}'".format(type(n).__name__))
with self._lock:
self._list *= n
return self
def __mul__(self, n: int) -> 'ConcurrentList[_T]':
"""
Implements multiplication (*).
Args:
n (int): The multiplier.
Returns:
ConcurrentList[_T]: A new ConcurrentList with the elements repeated.
Raises:
TypeError: If n is not an integer.
"""
if not isinstance(n, int):
raise TypeError("can't multiply sequence by non-int of type '{}'".format(type(n).__name__))
if self._freeze:
return ConcurrentList(initial=self._list * n)
else:
with self._lock:
return ConcurrentList(initial=self._list * n)
def __rmul__(self, n: int) -> 'ConcurrentList[_T]':
"""
Implements reverse multiplication.
"""
return self.__mul__(n)
[docs]
def index(self, item: Any, start: int = 0, end: Optional[int] = None) -> int:
"""
Return first index of value.
Args:
item (Any): The item to find.
start (int, optional): Start index for search.
end (int, optional): End index for search.
Returns:
int: The index of the item.
Raises:
ValueError: If the item is not present.
"""
if self._freeze:
return self._list.index(item, start, end if end is not None else len(self._list))
else:
with self._lock:
return self._list.index(item, start, end if end is not None else len(self._list))
[docs]
def count(self, item: Any) -> int:
"""
Return the number of occurrences of a value.
Args:
item (Any): The item to count.
Returns:
int: The number of occurrences.
"""
if self._freeze:
return self._list.count(item)
else:
with self._lock:
return self._list.count(item)
def __copy__(self) -> 'ConcurrentList[_T]':
"""
Return a shallow copy of the ConcurrentList.
This method is called by copy.copy().
"""
if self._freeze:
return ConcurrentList(initial=self._list.copy())
else:
with self._lock:
return ConcurrentList(initial=self._list.copy())
[docs]
def copy(self) -> "ConcurrentList[_T]":
"""
Return a shallow copy of the ConcurrentList.
Returns:
ConcurrentList[_T]: A new ConcurrentList with copied items.
"""
# Reusing the logic already present in __copy__
return self.__copy__()
def __deepcopy__(self, memo: dict) -> 'ConcurrentList[_T]':
"""
Return a deep copy of the ConcurrentList.
Args:
memo (dict): Memoization dictionary for deepcopy.
Returns:
ConcurrentList[_T]: A deep copy of this ConcurrentList.
"""
if self._freeze:
# If the list is frozen, we can safely deepcopy it
return ConcurrentList(initial=deepcopy(self._list, memo))
else:
with self._lock:
return ConcurrentList(initial=deepcopy(self._list, memo))
[docs]
def to_list(self) -> List[_T]:
"""
Return a shallow copy of the internal list.
Returns:
List[_T]: A copy of the list.
"""
if self._freeze:
# If the list is frozen, we can safely return a copy
return list(self._list)
else:
with self._lock:
return list(self._list)
[docs]
def batch_update(self, func: Callable[[List[_T]], None]) -> None:
"""
Perform a batch update on the list under a single lock acquisition.
This method allows multiple operations to be performed atomically.
Args:
func (Callable[[List[_T]], None]): A function that accepts the internal list as its only argument.
The function should perform all necessary mutations.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
func(self._list)
[docs]
def sort(self, key: Optional[Callable[[_T], Any]] = None, reverse: bool = False) -> None:
"""
Sort the list in-place.
Args:
key (Callable[[_T], Any], optional): A function used to extract a comparison key.
reverse (bool, optional): If True, the list elements are sorted as if each comparison were reversed.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
self._list.sort(key=key, reverse=reverse)
[docs]
def reverse(self) -> None:
"""
Reverse the elements of the list in-place.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
self._list.reverse()
[docs]
def map(self, func: Callable[[_T], Any]) -> 'ConcurrentList[Any]':
"""
Apply a function to all elements and return a new ConcurrentList.
Args:
func (Callable[[_T], Any]): The function to apply.
Returns:
ConcurrentList[Any]: A new ConcurrentList with the function applied to each element.
"""
if self._freeze:
return ConcurrentList(initial=list(map(func, self._list.copy())))
else:
with self._lock:
return ConcurrentList(initial=list(map(func, self._list.copy())))
[docs]
def filter(self, func: Callable[[_T], bool]) -> 'ConcurrentList[_T]':
"""
Filter elements based on a function and return a new ConcurrentList.
Args:
func (Callable[[_T], bool]): The filter function.
Returns:
ConcurrentList[_T]: A new ConcurrentList containing only elements where func(element) is True.
"""
if self._freeze:
return ConcurrentList(initial=list(filter(func, self._list.copy())))
else:
with self._lock:
return ConcurrentList(initial=list(filter(func, self._list.copy())))
[docs]
def reduce(self, func: Callable[[Any, _T], Any], initial: Optional[Any] = None) -> Any:
"""
Apply a function of two arguments cumulatively to the items of the list.
Args:
func (Callable[[Any, _T], Any]): Function to apply.
initial (Any, optional): Starting value.
Returns:
Any: The reduced value.
Raises:
TypeError: If the list is empty and no initial value is provided.
"""
# Acquire lock only if not frozen
snapshot = []
if self._freeze:
# When frozen, we can access _list directly without the lock
snapshot = list(self._list)
else:
with self._lock:
snapshot = list(self._list)
# Once copied, we can reduce outside the lock
if not snapshot and initial is None:
raise TypeError("reduce() of empty ConcurrentList with no initial value")
if initial is None:
return functools.reduce(func, snapshot)
else:
return functools.reduce(func, snapshot, initial)
[docs]
def update(self, other: Iterable[_T]) -> None:
"""
Update the list with elements from another iterable.
Args:
other (Iterable[_T]): The iterable to update from.
"""
if self._freeze:
raise TypeError("Cannot modify a frozen ConcurrentList")
else:
with self._lock:
self._list.extend(other)
# -----------------------------------------------------------------------------------
# Disposable Implementation
# -----------------------------------------------------------------------------------
def __enter__(self):
"""
Enter the runtime context.
- Acquires the internal lock for direct access.
- Allows `with ConcurrentList(...) as cc:` style usage.
- WARNING: Using the context manager bypasses the thread-safe method interface.
You are now responsible for ensuring correct multithreaded behavior.
"""
warnings.warn(
"Direct access to the internals via the context manager bypasses "
"the thread-safe interface. Use with extreme caution.",
UserWarning
)
self._lock.acquire()
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""
Exit the runtime context.
Responsibilities:
- Releases the internal lock acquired in `__enter__()`.
- Automatically calls `dispose()` to ensure the object is cleaned up.
- This pattern ensures the object is safely disposed even if an exception
occurs within the `with` block.
Notes:
- The object should be considered invalid after exiting the context.
- This design mimics resource safety patterns seen in systems like C#'s `IDisposable`
and C++ RAII.
- Users are free to manage `dispose()` manually if they choose not to use the
context manager.
Args:
exc_type: Exception type (if raised).
exc_val: Exception value (if raised).
exc_tb: Exception traceback (if raised).
"""
self._lock.release()
self.dispose()