Added enumerable order & added array & removed collection
Some checks failed
Test before pr merge / test-lint (pull_request) Successful in 6s
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 19s
Build on push / dependency (push) Successful in 18s
Build on push / mail (push) Successful in 16s
Build on push / application (push) Successful in 19s
Build on push / database (push) Successful in 19s
Build on push / translation (push) Successful in 20s
Build on push / auth (push) Successful in 15s
Build on push / api (push) Has been cancelled

This commit is contained in:
2025-09-24 19:25:26 +02:00
parent 01a2ff7166
commit 52d1f51ce2
10 changed files with 176 additions and 274 deletions

View File

@@ -1,9 +1,8 @@
from cpl.core.console import Console from cpl.core.console import Console
from cpl.core.utils.benchmark import Benchmark from cpl.core.utils.benchmark import Benchmark
from cpl.query.collection import Collection
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
from cpl.query.list import List
from cpl.query.immutable_list import ImmutableList from cpl.query.immutable_list import ImmutableList
from cpl.query.list import List
from cpl.query.set import Set from cpl.query.set import Set
@@ -24,11 +23,23 @@ def _default():
s.add(1) s.add(1)
Console.write_line(s) Console.write_line(s)
data = Enumerable(
[
{"name": "Alice", "age": 30},
{"name": "Dave", "age": 35},
{"name": "Charlie", "age": 25},
{"name": "Bob", "age": 25},
]
)
Console.write_line(data.order_by(lambda x: x["age"]).to_list())
Console.write_line(data.order_by(lambda x: x["age"]).then_by(lambda x: x["name"]).to_list())
Console.write_line(data.order_by(lambda x: x["name"]).then_by(lambda x: x["age"]).to_list())
def t_benchmark(data: list): def t_benchmark(data: list):
Benchmark.all("Enumerable", lambda: Enumerable(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()) Benchmark.all("Enumerable", lambda: Enumerable(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Set", lambda: Set(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()) Benchmark.all("Set", lambda: Set(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Collection", lambda: Collection(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("List", lambda: List(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()) Benchmark.all("List", lambda: List(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all( Benchmark.all(
"ImmutableList", lambda: ImmutableList(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list() "ImmutableList", lambda: ImmutableList(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()
@@ -39,7 +50,7 @@ def t_benchmark(data: list):
def main(): def main():
N = 10_000_000 N = 10_000_000
data = list(range(N)) data = list(range(N))
t_benchmark(data) #t_benchmark(data)
Console.write_line() Console.write_line()
_default() _default()

View File

@@ -1 +1,7 @@
from .array import Array
from .enumerable import Enumerable
from .immutable_list import ImmutableList
from .immutable_set import ImmutableSet
from .list import List
from .ordered_enumerable import OrderedEnumerable
from .set import Set

View File

@@ -0,0 +1,44 @@
from typing import Generic, Iterable, Optional
from cpl.core.typing import T
from cpl.query.list import List
from cpl.query.enumerable import Enumerable
class Array(Generic[T], List[T]):
def __init__(self, length: int, source: Optional[Iterable[T]] = None):
List.__init__(self, source)
self._length = length
@property
def length(self) -> int:
return len(self._source)
def add(self, item: T) -> None:
if self._length == self.length:
raise IndexError("Array is full")
self._source.append(item)
def extend(self, items: Iterable[T]) -> None:
if self._length == self.length:
raise IndexError("Array is full")
self._source.extend(items)
def insert(self, index: int, item: T) -> None:
if index < 0 or index > self.length:
raise IndexError("Index out of range")
self._source.insert(index, item)
def remove(self, item: T) -> None:
self._source.remove(item)
def pop(self, index: int = -1) -> T:
return self._source.pop(index)
def clear(self) -> None:
self._source.clear()
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._source)

View File

@@ -1,173 +0,0 @@
from itertools import islice, groupby
from typing import Generic, Callable, Iterable, Iterator, Dict, Tuple, Optional
from cpl.core.typing import T, R
from cpl.query.list import List
from cpl.query.typing import Predicate, K, Selector
class Collection(Generic[T]):
def __init__(self, source: Iterable[T]):
self._source = source
def __iter__(self) -> Iterator[T]:
return iter(self._source)
@property
def length(self) -> int:
return sum(1 for _ in self._source)
def where(self, f: Predicate) -> "Collection[T]":
return Collection([x for x in self._source if f(x)])
def select(self, f: Selector) -> "Collection[R]":
return Collection([f(x) for x in self._source])
def select_many(self, f: Callable[[T], Iterable[R]]) -> "Collection[R]":
return Collection([y for x in self._source for y in f(x)])
def take(self, count: int) -> "Collection[T]":
return Collection(islice(self._source, count))
def skip(self, count: int) -> "Collection[T]":
return Collection(islice(self._source, count, None))
def take_while(self, f: Predicate) -> "Collection[T]":
result = []
for x in self._source:
if f(x):
result.append(x)
else:
break
return Collection(result)
def skip_while(self, f: Predicate) -> "Collection[T]":
it = iter(self._source)
for x in it:
if not f(x):
return Collection([x] + list(it))
return Collection([])
def distinct(self) -> "Collection[T]":
seen = set()
return Collection([x for x in self._source if not (x in seen or seen.add(x))])
def union(self, other: Iterable[T]) -> "Collection[T]":
return self.concat(other).distinct()
def intersect(self, other: Iterable[T]) -> "Collection[T]":
other_set = set(other)
return Collection([x for x in self._source if x in other_set])
def except_(self, other: Iterable[T]) -> "Collection[T]":
other_set = set(other)
return Collection([x for x in self._source if x not in other_set])
def concat(self, other: Iterable[T]) -> "Collection[T]":
return Collection(self._source) + list(other)
def count(self) -> int:
return len(list(self._source))
def sum(self, f: Optional[Selector] = None) -> R:
return sum([f(x) for x in self._source]) if f else sum(self._source) # type: ignore
def min(self, f: Optional[Selector] = None) -> R:
return min([f(x) for x in self._source]) if f else min(self._source) # type: ignore
def max(self, f: Optional[Selector] = None) -> R:
return max([f(x) for x in self._source]) if f else max(self._source) # type: ignore
def average(self, f: Optional[Callable[[T], float]] = None) -> float:
values = [f(x) for x in self._source] if f else list(self._source)
return sum(values) / len(values) if values else 0.0
def aggregate(self, func: Callable[[R, T], R], seed: Optional[R] = None) -> R:
it = iter(self._source)
if seed is None:
acc = next(it) # type: ignore
else:
acc = seed
for x in it:
acc = func(acc, x)
return acc
def any(self, f: Optional[Predicate] = None) -> bool:
return any(f(x) if f else x for x in self._source)
def all(self, f: Predicate) -> bool:
return all(f(x) for x in self._source)
def contains(self, value: T) -> bool:
return value in self._source
def sequence_equal(self, other: Iterable[T]) -> bool:
return list(self._source) == list(other)
def group_by(self, key_f: Callable[[T], K]) -> "Collection[Tuple[K, List[T]]]":
sorted_data = sorted(self._source, key=key_f)
return Collection([(key, list(group)) for key, group in groupby(sorted_data, key=key_f)])
def join(
self, inner: Iterable[R], outer_key: Callable[[T], K], inner_key: Callable[[R], K], result: Callable[[T, R], R]
) -> "Collection[R]":
lookup: Dict[K, List[R]] = {}
for i in inner:
k = inner_key(i)
lookup.setdefault(k, []).append(i)
return Collection([result(o, i) for o in self._source for i in lookup.get(outer_key(o), [])])
def first(self, f: Optional[Predicate] = None) -> T:
if f:
for x in self._source:
if f(x):
return x
raise ValueError("No matching element")
return next(iter(self._source))
def first_or_default(self, default: Optional[T] = None) -> Optional[T]:
return next(iter(self._source), default)
def last(self) -> T:
return list(self._source)[-1]
def single(self, f: Optional[Predicate] = None) -> T:
items = [x for x in self._source if f(x)] if f else list(self._source)
if len(items) != 1:
raise ValueError("Sequence does not contain exactly one element")
return items[0]
def to_list(self) -> List[T]:
return List(self._source)
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._source)
def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]:
return {key_f(x): value_f(x) for x in self._source}
def cast(self, t: Selector) -> "Collection[R]":
return Collection([t(x) for x in self._source])
def of_type(self, t: type) -> "Collection[T]":
return Collection([x for x in self._source if isinstance(x, t)])
def reverse(self) -> "Collection[T]":
return Collection(reversed(list(self._source)))
def zip(self, other: Iterable[R]) -> "Collection[Tuple[T, R]]":
return Collection(zip(self._source, other))
@staticmethod
def range(start: int, count: int) -> "Collection[int]":
return Collection(range(start, start + count))
@staticmethod
def repeat(value: T, count: int) -> "Collection[T]":
return Collection([value for _ in range(count)])
@staticmethod
def empty() -> "Collection[T]":
return Collection([])

View File

@@ -167,17 +167,13 @@ class Enumerable(Generic[T]):
def to_list(self) -> "List[T]": def to_list(self) -> "List[T]":
from cpl.query.list import List from cpl.query.list import List
return List(self._source)
def to_collection(self) -> "Collection[T]": return List(self)
from cpl.query.collection import Collection
return Collection(self._source)
def to_set(self) -> "Set[T]": def to_set(self) -> "Set[T]":
from cpl.query.set import Set from cpl.query.set import Set
return Set(self._source) return Set(self)
def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]: def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]:
return {key_f(x): value_f(x) for x in self._source} return {key_f(x): value_f(x) for x in self._source}
@@ -194,6 +190,16 @@ class Enumerable(Generic[T]):
def zip(self, other: Iterable[R]) -> "Enumerable[Tuple[T, R]]": def zip(self, other: Iterable[R]) -> "Enumerable[Tuple[T, R]]":
return Enumerable(zip(self._source, other)) return Enumerable(zip(self._source, other))
def order_by(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
from cpl.query.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self._source, [(key_selector, False)])
def order_by_descending(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
from cpl.query.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self._source, [(key_selector, True)])
@staticmethod @staticmethod
def range(start: int, count: int) -> "Enumerable[int]": def range(start: int, count: int) -> "Enumerable[int]":
return Enumerable(range(start, start + count)) return Enumerable(range(start, start + count))

View File

@@ -6,40 +6,60 @@ from cpl.query.enumerable import Enumerable
class ImmutableList(Generic[T], Enumerable[T]): class ImmutableList(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None): def __init__(self, source: Optional[Iterable[T]] = None):
Enumerable.__init__(self, [])
if source is None: if source is None:
source = [] source = []
elif not isinstance(source, list):
source = list(source)
Enumerable.__init__(self, source) self.__source = source
@property @property
def _items(self) -> list[T]: def _source(self) -> list[T]:
return list(self._source) return self.__source
@_source.setter
def _source(self, value: list[T]) -> None:
self.__source = value
def __iter__(self) -> Iterator[T]: def __iter__(self) -> Iterator[T]:
return iter(self._items) return iter(self._source)
def __len__(self) -> int: def __len__(self) -> int:
return len(self._items) return len(self._source)
def __getitem__(self, index: int) -> T: def __getitem__(self, index: int) -> T:
return self._items[index] return self._source[index]
def __contains__(self, item: T) -> bool: def __contains__(self, item: T) -> bool:
return item in self._items return item in self._source
def __repr__(self) -> str: def __repr__(self) -> str:
return f"List({self._items!r})" return f"List({self._source!r})"
@property @property
def length(self) -> int: def length(self) -> int:
return len(self._items) return len(self._source)
def add(self, item: T) -> None:
self._source.append(item)
def extend(self, items: Iterable[T]) -> None:
self._source.extend(items)
def insert(self, index: int, item: T) -> None:
self._source.insert(index, item)
def remove(self, item: T) -> None:
self._source.remove(item)
def pop(self, index: int = -1) -> T:
return self._source.pop(index)
def clear(self) -> None:
self._source.clear()
def to_enumerable(self) -> "Enumerable[T]": def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
return Enumerable(self._items) return Enumerable(self._source)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._items)

View File

@@ -6,11 +6,13 @@ from cpl.query.enumerable import Enumerable
class ImmutableSet(Generic[T], Enumerable[T]): class ImmutableSet(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None): def __init__(self, source: Optional[Iterable[T]] = None):
Enumerable.__init__(self, [])
if source is None: if source is None:
source = set() source = set()
elif not isinstance(source, set):
source = set(source)
self.__source = source self.__source = source
Enumerable.__init__(self, [])
@property @property
def _source(self) -> set[T]: def _source(self) -> set[T]:
@@ -41,4 +43,5 @@ class ImmutableSet(Generic[T], Enumerable[T]):
def to_enumerable(self) -> "Enumerable[T]": def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
return Enumerable(self._source)
return Enumerable(self._source)

View File

@@ -1,66 +1,36 @@
from typing import Generic, Iterable, Iterator, Optional from typing import Generic, Iterable, Optional
from cpl.core.typing import T from cpl.core.typing import T
from cpl.query.immutable_list import ImmutableList
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
class List(Generic[T], Enumerable[T]): class List(Generic[T], ImmutableList[T]):
def __init__(self, source: Optional[Iterable[T]] = None): def __init__(self, source: Optional[Iterable[T]] = None):
if source is None: ImmutableList.__init__(self, source)
source = []
Enumerable.__init__(self, source)
@property
def _items(self) -> list[T]:
return list(self._source)
def __iter__(self) -> Iterator[T]:
return iter(self._items)
def __len__(self) -> int:
return len(self._items)
def __getitem__(self, index: int) -> T:
return self._items[index]
def __setitem__(self, index: int, value: T) -> None: def __setitem__(self, index: int, value: T) -> None:
self._items[index] = value self._source[index] = value
def __contains__(self, item: T) -> bool:
return item in self._items
def __repr__(self) -> str:
return f"List({self._items!r})"
@property
def length(self) -> int:
return len(self._items)
def add(self, item: T) -> None: def add(self, item: T) -> None:
self._items.append(item) self._source.append(item)
def extend(self, items: Iterable[T]) -> None: def extend(self, items: Iterable[T]) -> None:
self._items.extend(items) self._source.extend(items)
def insert(self, index: int, item: T) -> None: def insert(self, index: int, item: T) -> None:
self._items.insert(index, item) self._source.insert(index, item)
def remove(self, item: T) -> None: def remove(self, item: T) -> None:
self._items.remove(item) self._source.remove(item)
def pop(self, index: int = -1) -> T: def pop(self, index: int = -1) -> T:
return self._items.pop(index) return self._source.pop(index)
def clear(self) -> None: def clear(self) -> None:
self._items.clear() self._source.clear()
def to_enumerable(self) -> "Enumerable[T]": def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
return Enumerable(self._items) return Enumerable(self._source)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._items)

View File

@@ -0,0 +1,40 @@
from typing import Callable, List, Generic, Iterator
from cpl.core.typing import T
from cpl.query.enumerable import Enumerable
from cpl.query.typing import K
class OrderedEnumerable(Enumerable[T]):
def __init__(self, source, key_selectors: List[tuple[Callable[[T], K], bool]]):
super().__init__(source)
self._key_selectors = key_selectors
def __iter__(self) -> Iterator[T]:
def composite_key(x):
keys = []
for selector, descending in self._key_selectors:
k = selector(x)
keys.append((k, not descending))
return tuple(k if asc else _DescendingWrapper(k) for k, asc in keys)
return iter(sorted(self._source, key=composite_key))
def then_by(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
return OrderedEnumerable(self._source, self._key_selectors + [(key_selector, False)])
def then_by_descending(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
return OrderedEnumerable(self._source, self._key_selectors + [(key_selector, True)])
class _DescendingWrapper:
def __init__(self, value):
self.value = value
def __lt__(self, other):
return self.value > other.value
def __gt__(self, other):
return self.value < other.value
def __eq__(self, other):
return self.value == other.value

View File

@@ -1,39 +1,13 @@
from typing import Generic, Iterable, Iterator, Optional from typing import Generic, Iterable, Optional
from cpl.core.typing import T from cpl.core.typing import T
from cpl.query.immutable_set import ImmutableSet
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
class Set(Generic[T], Enumerable[T]): class Set(Generic[T], ImmutableSet[T]):
def __init__(self, source: Optional[Iterable[T]] = None): def __init__(self, source: Optional[Iterable[T]] = None):
if source is None: ImmutableSet.__init__(self, source)
source = set()
self.__source = source
Enumerable.__init__(self, [])
@property
def _source(self) -> set[T]:
return self.__source
@_source.setter
def _source(self, value: set[T]) -> None:
if not isinstance(value, set):
value = set(value)
self.__source = value
def __iter__(self) -> Iterator[T]:
return iter(self._source)
def __len__(self) -> int:
return len(self._source)
def __contains__(self, item: T) -> bool:
return item in self._source
def __repr__(self) -> str:
return f"Set({self._source!r})"
@property @property
def length(self) -> int: def length(self) -> int:
@@ -50,4 +24,5 @@ class Set(Generic[T], Enumerable[T]):
def to_enumerable(self) -> "Enumerable[T]": def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
return Enumerable(self._source)
return Enumerable(self._source)