Compare commits

...

1 Commits

Author SHA1 Message Date
ad22bb6ecf Added enumerable order & added array & removed collection
Some checks failed
Test before pr merge / test-lint (pull_request) Failing after 6s
Build on push / prepare (push) Successful in 10s
Build on push / query (push) Successful in 18s
Build on push / core (push) Successful in 25s
Build on push / api (push) Has been cancelled
Build on push / application (push) Has been cancelled
Build on push / auth (push) Has been cancelled
Build on push / database (push) Has been cancelled
Build on push / mail (push) Has been cancelled
Build on push / translation (push) Has been cancelled
Build on push / dependency (push) Has been cancelled
2025-09-24 19:37:14 +02:00
10 changed files with 170 additions and 273 deletions

View File

@@ -1,9 +1,8 @@
from cpl.core.console import Console
from cpl.core.utils.benchmark import Benchmark
from cpl.query.collection import Collection
from cpl.query.enumerable import Enumerable
from cpl.query.list import List
from cpl.query.immutable_list import ImmutableList
from cpl.query.list import List
from cpl.query.set import Set
@@ -24,11 +23,23 @@ def _default():
s.add(1)
Console.write_line(s)
data = Enumerable(
[
{"name": "Alice", "age": 30},
{"name": "Dave", "age": 35},
{"name": "Charlie", "age": 25},
{"name": "Bob", "age": 25},
]
)
Console.write_line(data.order_by(lambda x: x["age"]).to_list())
Console.write_line(data.order_by(lambda x: x["age"]).then_by(lambda x: x["name"]).to_list())
Console.write_line(data.order_by(lambda x: x["name"]).then_by(lambda x: x["age"]).to_list())
def t_benchmark(data: list):
Benchmark.all("Enumerable", lambda: Enumerable(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Set", lambda: Set(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Collection", lambda: Collection(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("List", lambda: List(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all(
"ImmutableList", lambda: ImmutableList(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()
@@ -39,7 +50,7 @@ def t_benchmark(data: list):
def main():
N = 10_000_000
data = list(range(N))
t_benchmark(data)
#t_benchmark(data)
Console.write_line()
_default()

View File

@@ -1 +1,7 @@
from .array import Array
from .enumerable import Enumerable
from .immutable_list import ImmutableList
from .immutable_set import ImmutableSet
from .list import List
from .ordered_enumerable import OrderedEnumerable
from .set import Set

View File

@@ -0,0 +1,44 @@
from typing import Generic, Iterable, Optional
from cpl.core.typing import T
from cpl.query.list import List
from cpl.query.enumerable import Enumerable
class Array(Generic[T], List[T]):
def __init__(self, length: int, source: Optional[Iterable[T]] = None):
List.__init__(self, source)
self._length = length
@property
def length(self) -> int:
return len(self._source)
def add(self, item: T) -> None:
if self._length == self.length:
raise IndexError("Array is full")
self._source.append(item)
def extend(self, items: Iterable[T]) -> None:
if self._length == self.length:
raise IndexError("Array is full")
self._source.extend(items)
def insert(self, index: int, item: T) -> None:
if index < 0 or index > self.length:
raise IndexError("Index out of range")
self._source.insert(index, item)
def remove(self, item: T) -> None:
self._source.remove(item)
def pop(self, index: int = -1) -> T:
return self._source.pop(index)
def clear(self) -> None:
self._source.clear()
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._source)

View File

@@ -1,173 +0,0 @@
from itertools import islice, groupby
from typing import Generic, Callable, Iterable, Iterator, Dict, Tuple, Optional
from cpl.core.typing import T, R
from cpl.query.list import List
from cpl.query.typing import Predicate, K, Selector
class Collection(Generic[T]):
def __init__(self, source: Iterable[T]):
self._source = source
def __iter__(self) -> Iterator[T]:
return iter(self._source)
@property
def length(self) -> int:
return sum(1 for _ in self._source)
def where(self, f: Predicate) -> "Collection[T]":
return Collection([x for x in self._source if f(x)])
def select(self, f: Selector) -> "Collection[R]":
return Collection([f(x) for x in self._source])
def select_many(self, f: Callable[[T], Iterable[R]]) -> "Collection[R]":
return Collection([y for x in self._source for y in f(x)])
def take(self, count: int) -> "Collection[T]":
return Collection(islice(self._source, count))
def skip(self, count: int) -> "Collection[T]":
return Collection(islice(self._source, count, None))
def take_while(self, f: Predicate) -> "Collection[T]":
result = []
for x in self._source:
if f(x):
result.append(x)
else:
break
return Collection(result)
def skip_while(self, f: Predicate) -> "Collection[T]":
it = iter(self._source)
for x in it:
if not f(x):
return Collection([x] + list(it))
return Collection([])
def distinct(self) -> "Collection[T]":
seen = set()
return Collection([x for x in self._source if not (x in seen or seen.add(x))])
def union(self, other: Iterable[T]) -> "Collection[T]":
return self.concat(other).distinct()
def intersect(self, other: Iterable[T]) -> "Collection[T]":
other_set = set(other)
return Collection([x for x in self._source if x in other_set])
def except_(self, other: Iterable[T]) -> "Collection[T]":
other_set = set(other)
return Collection([x for x in self._source if x not in other_set])
def concat(self, other: Iterable[T]) -> "Collection[T]":
return Collection(self._source) + list(other)
def count(self) -> int:
return len(list(self._source))
def sum(self, f: Optional[Selector] = None) -> R:
return sum([f(x) for x in self._source]) if f else sum(self._source) # type: ignore
def min(self, f: Optional[Selector] = None) -> R:
return min([f(x) for x in self._source]) if f else min(self._source) # type: ignore
def max(self, f: Optional[Selector] = None) -> R:
return max([f(x) for x in self._source]) if f else max(self._source) # type: ignore
def average(self, f: Optional[Callable[[T], float]] = None) -> float:
values = [f(x) for x in self._source] if f else list(self._source)
return sum(values) / len(values) if values else 0.0
def aggregate(self, func: Callable[[R, T], R], seed: Optional[R] = None) -> R:
it = iter(self._source)
if seed is None:
acc = next(it) # type: ignore
else:
acc = seed
for x in it:
acc = func(acc, x)
return acc
def any(self, f: Optional[Predicate] = None) -> bool:
return any(f(x) if f else x for x in self._source)
def all(self, f: Predicate) -> bool:
return all(f(x) for x in self._source)
def contains(self, value: T) -> bool:
return value in self._source
def sequence_equal(self, other: Iterable[T]) -> bool:
return list(self._source) == list(other)
def group_by(self, key_f: Callable[[T], K]) -> "Collection[Tuple[K, List[T]]]":
sorted_data = sorted(self._source, key=key_f)
return Collection([(key, list(group)) for key, group in groupby(sorted_data, key=key_f)])
def join(
self, inner: Iterable[R], outer_key: Callable[[T], K], inner_key: Callable[[R], K], result: Callable[[T, R], R]
) -> "Collection[R]":
lookup: Dict[K, List[R]] = {}
for i in inner:
k = inner_key(i)
lookup.setdefault(k, []).append(i)
return Collection([result(o, i) for o in self._source for i in lookup.get(outer_key(o), [])])
def first(self, f: Optional[Predicate] = None) -> T:
if f:
for x in self._source:
if f(x):
return x
raise ValueError("No matching element")
return next(iter(self._source))
def first_or_default(self, default: Optional[T] = None) -> Optional[T]:
return next(iter(self._source), default)
def last(self) -> T:
return list(self._source)[-1]
def single(self, f: Optional[Predicate] = None) -> T:
items = [x for x in self._source if f(x)] if f else list(self._source)
if len(items) != 1:
raise ValueError("Sequence does not contain exactly one element")
return items[0]
def to_list(self) -> List[T]:
return List(self._source)
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._source)
def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]:
return {key_f(x): value_f(x) for x in self._source}
def cast(self, t: Selector) -> "Collection[R]":
return Collection([t(x) for x in self._source])
def of_type(self, t: type) -> "Collection[T]":
return Collection([x for x in self._source if isinstance(x, t)])
def reverse(self) -> "Collection[T]":
return Collection(reversed(list(self._source)))
def zip(self, other: Iterable[R]) -> "Collection[Tuple[T, R]]":
return Collection(zip(self._source, other))
@staticmethod
def range(start: int, count: int) -> "Collection[int]":
return Collection(range(start, start + count))
@staticmethod
def repeat(value: T, count: int) -> "Collection[T]":
return Collection([value for _ in range(count)])
@staticmethod
def empty() -> "Collection[T]":
return Collection([])

View File

@@ -167,17 +167,12 @@ class Enumerable(Generic[T]):
def to_list(self) -> "List[T]":
from cpl.query.list import List
return List(self._source)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._source)
return List(self)
def to_set(self) -> "Set[T]":
from cpl.query.set import Set
return Set(self._source)
return Set(self)
def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]:
return {key_f(x): value_f(x) for x in self._source}
@@ -194,6 +189,14 @@ class Enumerable(Generic[T]):
def zip(self, other: Iterable[R]) -> "Enumerable[Tuple[T, R]]":
return Enumerable(zip(self._source, other))
def order_by(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
from cpl.query.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self._source, [(key_selector, False)])
def order_by_descending(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
from cpl.query.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self._source, [(key_selector, True)])
@staticmethod
def range(start: int, count: int) -> "Enumerable[int]":
return Enumerable(range(start, start + count))

View File

@@ -6,40 +6,60 @@ from cpl.query.enumerable import Enumerable
class ImmutableList(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
Enumerable.__init__(self, [])
if source is None:
source = []
elif not isinstance(source, list):
source = list(source)
Enumerable.__init__(self, source)
self.__source = source
@property
def _items(self) -> list[T]:
return list(self._source)
def _source(self) -> list[T]:
return self.__source
@_source.setter
def _source(self, value: list[T]) -> None:
self.__source = value
def __iter__(self) -> Iterator[T]:
return iter(self._items)
return iter(self._source)
def __len__(self) -> int:
return len(self._items)
return len(self._source)
def __getitem__(self, index: int) -> T:
return self._items[index]
return self._source[index]
def __contains__(self, item: T) -> bool:
return item in self._items
return item in self._source
def __repr__(self) -> str:
return f"List({self._items!r})"
return f"List({self._source!r})"
@property
def length(self) -> int:
return len(self._items)
return len(self._source)
def add(self, item: T) -> None:
self._source.append(item)
def extend(self, items: Iterable[T]) -> None:
self._source.extend(items)
def insert(self, index: int, item: T) -> None:
self._source.insert(index, item)
def remove(self, item: T) -> None:
self._source.remove(item)
def pop(self, index: int = -1) -> T:
return self._source.pop(index)
def clear(self) -> None:
self._source.clear()
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._items)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._items)
return Enumerable(self._source)

View File

@@ -6,11 +6,13 @@ from cpl.query.enumerable import Enumerable
class ImmutableSet(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
Enumerable.__init__(self, [])
if source is None:
source = set()
elif not isinstance(source, set):
source = set(source)
self.__source = source
Enumerable.__init__(self, [])
@property
def _source(self) -> set[T]:

View File

@@ -1,66 +1,36 @@
from typing import Generic, Iterable, Iterator, Optional
from typing import Generic, Iterable, Optional
from cpl.core.typing import T
from cpl.query.immutable_list import ImmutableList
from cpl.query.enumerable import Enumerable
class List(Generic[T], Enumerable[T]):
class List(Generic[T], ImmutableList[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
if source is None:
source = []
Enumerable.__init__(self, source)
@property
def _items(self) -> list[T]:
return list(self._source)
def __iter__(self) -> Iterator[T]:
return iter(self._items)
def __len__(self) -> int:
return len(self._items)
def __getitem__(self, index: int) -> T:
return self._items[index]
ImmutableList.__init__(self, source)
def __setitem__(self, index: int, value: T) -> None:
self._items[index] = value
def __contains__(self, item: T) -> bool:
return item in self._items
def __repr__(self) -> str:
return f"List({self._items!r})"
@property
def length(self) -> int:
return len(self._items)
self._source[index] = value
def add(self, item: T) -> None:
self._items.append(item)
self._source.append(item)
def extend(self, items: Iterable[T]) -> None:
self._items.extend(items)
self._source.extend(items)
def insert(self, index: int, item: T) -> None:
self._items.insert(index, item)
self._source.insert(index, item)
def remove(self, item: T) -> None:
self._items.remove(item)
self._source.remove(item)
def pop(self, index: int = -1) -> T:
return self._items.pop(index)
return self._source.pop(index)
def clear(self) -> None:
self._items.clear()
self._source.clear()
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._items)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._items)
return Enumerable(self._source)

View File

@@ -0,0 +1,40 @@
from typing import Callable, List, Generic, Iterator
from cpl.core.typing import T
from cpl.query.enumerable import Enumerable
from cpl.query.typing import K
class OrderedEnumerable(Enumerable[T]):
def __init__(self, source, key_selectors: List[tuple[Callable[[T], K], bool]]):
super().__init__(source)
self._key_selectors = key_selectors
def __iter__(self) -> Iterator[T]:
def composite_key(x):
keys = []
for selector, descending in self._key_selectors:
k = selector(x)
keys.append((k, not descending))
return tuple(k if asc else _DescendingWrapper(k) for k, asc in keys)
return iter(sorted(self._source, key=composite_key))
def then_by(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
return OrderedEnumerable(self._source, self._key_selectors + [(key_selector, False)])
def then_by_descending(self, key_selector: Callable[[T], K]) -> "OrderedEnumerable[T]":
return OrderedEnumerable(self._source, self._key_selectors + [(key_selector, True)])
class _DescendingWrapper:
def __init__(self, value):
self.value = value
def __lt__(self, other):
return self.value > other.value
def __gt__(self, other):
return self.value < other.value
def __eq__(self, other):
return self.value == other.value

View File

@@ -1,39 +1,13 @@
from typing import Generic, Iterable, Iterator, Optional
from typing import Generic, Iterable, Optional
from cpl.core.typing import T
from cpl.query.immutable_set import ImmutableSet
from cpl.query.enumerable import Enumerable
class Set(Generic[T], Enumerable[T]):
class Set(Generic[T], ImmutableSet[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
if source is None:
source = set()
self.__source = source
Enumerable.__init__(self, [])
@property
def _source(self) -> set[T]:
return self.__source
@_source.setter
def _source(self, value: set[T]) -> None:
if not isinstance(value, set):
value = set(value)
self.__source = value
def __iter__(self) -> Iterator[T]:
return iter(self._source)
def __len__(self) -> int:
return len(self._source)
def __contains__(self, item: T) -> bool:
return item in self._source
def __repr__(self) -> str:
return f"Set({self._source!r})"
ImmutableSet.__init__(self, source)
@property
def length(self) -> int: