WIP: dev into master #184

Draft
edraft wants to merge 121 commits from dev into master
32 changed files with 813 additions and 935 deletions
Showing only changes of commit 01a2ff7166 - Show all commits

View File

@@ -0,0 +1,49 @@
from cpl.core.console import Console
from cpl.core.utils.benchmark import Benchmark
from cpl.query.collection import Collection
from cpl.query.enumerable import Enumerable
from cpl.query.list import List
from cpl.query.immutable_list import ImmutableList
from cpl.query.set import Set
def _default():
Console.write_line(Enumerable.empty().to_list())
Console.write_line(Enumerable.range(0, 100).length)
Console.write_line(Enumerable.range(0, 100).to_list())
Console.write_line(Enumerable.range(0, 100).where(lambda x: x % 2 == 0).length)
Console.write_line(
Enumerable.range(0, 100).where(lambda x: x % 2 == 0).to_list().select(lambda x: str(x)).to_list()
)
Console.write_line(List)
s =Enumerable.range(0, 10).to_set()
Console.write_line(s)
s.add(1)
Console.write_line(s)
def t_benchmark(data: list):
Benchmark.all("Enumerable", lambda: Enumerable(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Set", lambda: Set(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Collection", lambda: Collection(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("List", lambda: List(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all(
"ImmutableList", lambda: ImmutableList(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()
)
Benchmark.all("List comprehension", lambda: [x * 2 for x in data if x % 2 == 0])
def main():
N = 10_000_000
data = list(range(N))
t_benchmark(data)
Console.write_line()
_default()
if __name__ == "__main__":
main()

View File

@@ -14,3 +14,4 @@ UuidId = str | UUID
SerialId = int SerialId = int
Id = UuidId | SerialId Id = UuidId | SerialId
TNumber = int | float | complex

View File

@@ -0,0 +1,57 @@
import time
import tracemalloc
from typing import List, Callable
from cpl.core.console import Console
class Benchmark:
@staticmethod
def all(label: str, func: Callable, iterations: int = 5):
times: List[float] = []
mems: List[float] = []
for _ in range(iterations):
start = time.perf_counter()
func()
end = time.perf_counter()
times.append(end - start)
for _ in range(iterations):
tracemalloc.start()
func()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
mems.append(peak)
avg_time = sum(times) / len(times)
avg_mem = sum(mems) / len(mems) / (1024 * 1024)
Console.write_line(f"{label:20s} -> min {min(times):.6f}s avg {avg_time:.6f}s mem {avg_mem:.8f} MB")
@staticmethod
def time(label: str, func: Callable, iterations: int = 5):
times: List[float] = []
for _ in range(iterations):
start = time.perf_counter()
func()
end = time.perf_counter()
times.append(end - start)
avg_time = sum(times) / len(times)
Console.write_line(f"{label:20s} -> min {min(times):.6f}s avg {avg_time:.6f}s")
@staticmethod
def memory(label: str, func: Callable, iterations: int = 5):
mems: List[float] = []
for _ in range(iterations):
tracemalloc.start()
func()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
mems.append(peak)
avg_mem = sum(mems) / len(mems) / (1024 * 1024)
Console.write_line(f"{label:20s} -> mem {avg_mem:.2f} MB")

View File

@@ -0,0 +1,48 @@
from typing import Any
class Number:
@staticmethod
def is_number(value: Any) -> bool:
"""Check if the value is a number (int or float)."""
return isinstance(value, (int, float, complex))
@staticmethod
def to_number(value: Any) -> int | float | complex:
"""
Convert a given value into int, float, or complex.
Raises ValueError if conversion is not possible.
"""
if isinstance(value, (int, float, complex)):
return value
if isinstance(value, str):
value = value.strip()
for caster in (int, float, complex):
try:
return caster(value)
except ValueError:
continue
raise ValueError(f"Cannot convert string '{value}' to number.")
if isinstance(value, bool):
return int(value)
try:
return int(value)
except Exception:
pass
try:
return float(value)
except Exception:
pass
try:
return complex(value)
except Exception:
pass
raise ValueError(f"Cannot convert type {type(value)} to number.")

View File

@@ -1,2 +0,0 @@
def is_number(t: type) -> bool:
return issubclass(t, int) or issubclass(t, float) or issubclass(t, complex)

View File

@@ -1,5 +0,0 @@
from .default_lambda import default_lambda
from .ordered_queryable import OrderedQueryable
from .sequence import Sequence
from .ordered_queryable_abc import OrderedQueryableABC
from .queryable_abc import QueryableABC

View File

@@ -1,2 +0,0 @@
def default_lambda(x: object):
return x

View File

@@ -1,34 +0,0 @@
from collections.abc import Callable
from cpl.query.base.ordered_queryable_abc import OrderedQueryableABC
from cpl.query.exceptions import ArgumentNoneException, ExceptionArgument
class OrderedQueryable(OrderedQueryableABC):
r"""Implementation of :class: `cpl.query.base.ordered_queryable_abc.OrderedQueryableABC`"""
def __init__(self, _t: type, _values: OrderedQueryableABC = None, _func: Callable = None):
OrderedQueryableABC.__init__(self, _t, _values, _func)
def then_by(self, _func: Callable) -> OrderedQueryableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedQueryable(self.type, sorted(self, key=lambda *args: [f(*args) for f in self._funcs]), _func)
def then_by_descending(self, _func: Callable) -> OrderedQueryableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedQueryable(
self.type, sorted(self, key=lambda *args: [f(*args) for f in self._funcs], reverse=True), _func
)

View File

@@ -1,38 +0,0 @@
from __future__ import annotations
from abc import abstractmethod
from collections.abc import Callable
from typing import Iterable
from cpl.query.base.queryable_abc import QueryableABC
class OrderedQueryableABC(QueryableABC):
@abstractmethod
def __init__(self, _t: type, _values: Iterable = None, _func: Callable = None):
QueryableABC.__init__(self, _t, _values)
self._funcs: list[Callable] = []
if _func is not None:
self._funcs.append(_func)
@abstractmethod
def then_by(self, func: Callable) -> OrderedQueryableABC:
r"""Sorts OrderedList in ascending order by function
Parameter:
func: :class:`Callable`
Returns:
list of :class:`cpl.query.base.ordered_queryable_abc.OrderedQueryableABC`
"""
@abstractmethod
def then_by_descending(self, func: Callable) -> OrderedQueryableABC:
r"""Sorts OrderedList in descending order by function
Parameter:
func: :class:`Callable`
Returns:
list of :class:`cpl.query.base.ordered_queryable_abc.OrderedQueryableABC`
"""

View File

@@ -1,569 +0,0 @@
from __future__ import annotations
from typing import Optional, Callable, Union, Iterable, Any
from cpl.query._helper import is_number
from cpl.query.base import default_lambda
from cpl.query.base.sequence import Sequence
from cpl.query.exceptions import (
InvalidTypeException,
ArgumentNoneException,
ExceptionArgument,
IndexOutOfRangeException,
)
class QueryableABC(Sequence):
def __init__(self, t: type, values: Iterable = None):
Sequence.__init__(self, t, values)
def all(self, _func: Callable = None) -> bool:
r"""Checks if every element of list equals result found by function
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
bool
"""
if _func is None:
_func = default_lambda
return self.count(_func) == self.count()
def any(self, _func: Callable = None) -> bool:
r"""Checks if list contains result found by function
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
bool
"""
if _func is None:
_func = default_lambda
return self.where(_func).count() > 0
def average(self, _func: Callable = None) -> Union[int, float, complex]:
r"""Returns average value of list
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
Union[int, float, complex]
"""
if _func is None and not is_number(self.type):
raise InvalidTypeException()
return self.sum(_func) / self.count()
def contains(self, _value: object) -> bool:
r"""Checks if list contains value given by function
Parameter
---------
value: :class:`object`
value
Returns
-------
bool
"""
if _value is None:
raise ArgumentNoneException(ExceptionArgument.value)
return self.where(lambda x: x == _value).count() > 0
def count(self, _func: Callable = None) -> int:
r"""Returns length of list or count of found elements
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
int
"""
if _func is None:
return self.__len__()
return self.where(_func).count()
def distinct(self, _func: Callable = None) -> QueryableABC:
r"""Returns list without redundancies
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
if _func is None:
_func = default_lambda
result = []
known_values = []
for element in self:
value = _func(element)
if value in known_values:
continue
known_values.append(value)
result.append(element)
return type(self)(self._type, result)
def element_at(self, _index: int) -> any:
r"""Returns element at given index
Parameter
---------
_index: :class:`int`
index
Returns
-------
Value at _index: any
"""
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
if _index < 0 or _index >= self.count():
raise IndexOutOfRangeException
result = self._values[_index]
if result is None:
raise IndexOutOfRangeException
return result
def element_at_or_default(self, _index: int) -> Optional[any]:
r"""Returns element at given index or None
Parameter
---------
_index: :class:`int`
index
Returns
-------
Value at _index: Optional[any]
"""
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
try:
return self._values[_index]
except IndexError:
return None
def first(self) -> any:
r"""Returns first element
Returns
-------
First element of list: any
"""
if self.count() == 0:
raise IndexOutOfRangeException()
return self._values[0]
def first_or_default(self) -> any:
r"""Returns first element or None
Returns
-------
First element of list: Optional[any]
"""
if self.count() == 0:
return None
return self._values[0]
def for_each(self, _func: Callable = None):
r"""Runs given function for each element of list
Parameter
---------
func: :class: `Callable`
function to call
"""
if _func is not None:
for element in self:
_func(element)
return self
def group_by(self, _func: Callable = None) -> QueryableABC:
r"""Groups by func
Returns
-------
Grouped list[list[any]]: any
"""
if _func is None:
_func = default_lambda
groups = {}
for v in self:
value = _func(v)
if v not in groups:
groups[value] = []
groups[value].append(v)
v = []
for g in groups.values():
v.append(type(self)(object, g))
x = type(self)(type(self), v)
return x
def last(self) -> any:
r"""Returns last element
Returns
-------
Last element of list: any
"""
if self.count() == 0:
raise IndexOutOfRangeException()
return self._values[self.count() - 1]
def last_or_default(self) -> any:
r"""Returns last element or None
Returns
-------
Last element of list: Optional[any]
"""
if self.count() == 0:
return None
return self._values[self.count() - 1]
def max(self, _func: Callable = None) -> object:
r"""Returns the highest value
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
object
"""
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = default_lambda
return _func(max(self, key=_func))
def median(self, _func=None) -> Union[int, float]:
r"""Return the median value of data elements
Returns
-------
Union[int, float]
"""
if _func is None:
_func = default_lambda
result = self.order_by(_func).select(_func).to_list()
length = len(result)
i = int(length / 2)
return result[i] if length % 2 == 1 else (float(result[i - 1]) + float(result[i])) / float(2)
def min(self, _func: Callable = None) -> object:
r"""Returns the lowest value
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
object
"""
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = default_lambda
return _func(min(self, key=_func))
def order_by(self, _func: Callable = None) -> "OrderedQueryableABC":
r"""Sorts elements by function in ascending order
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
:class: `cpl.query.base.ordered_queryable_abc.OrderedQueryableABC`
"""
if _func is None:
_func = default_lambda
from cpl.query.base.ordered_queryable import OrderedQueryable
return OrderedQueryable(self.type, sorted(self, key=_func), _func)
def order_by_descending(self, _func: Callable = None) -> "OrderedQueryableABC":
r"""Sorts elements by function in descending order
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
:class: `cpl.query.base.ordered_queryable_abc.OrderedQueryableABC`
"""
if _func is None:
_func = default_lambda
from cpl.query.base.ordered_queryable import OrderedQueryable
return OrderedQueryable(self.type, sorted(self, key=_func, reverse=True), _func)
def reverse(self) -> QueryableABC:
r"""Reverses list
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
return type(self)(self._type, reversed(self._values))
def select(self, _func: Callable) -> QueryableABC:
r"""Formats each element of list to a given format
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
if _func is None:
_func = default_lambda
_l = [_func(_o) for _o in self]
_t = type(_l[0]) if len(_l) > 0 else Any
return type(self)(_t, _l)
def select_many(self, _func: Callable) -> QueryableABC:
r"""Flattens resulting lists to one
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
# The line below is pain. I don't understand anything of it...
# written on 09.11.2022 by Sven Heidemann
return type(self)(object, [_a for _o in self for _a in _func(_o)])
def single(self) -> any:
r"""Returns one single element of list
Returns
-------
Found value: any
Raises
------
ArgumentNoneException: when argument is None
Exception: when argument is None or found more than one element
"""
if self.count() > 1:
raise Exception("Found more than one element")
elif self.count() == 0:
raise Exception("Found no element")
return self._values[0]
def single_or_default(self) -> Optional[any]:
r"""Returns one single element of list
Returns
-------
Found value: Optional[any]
"""
if self.count() > 1:
raise Exception("Index out of range")
elif self.count() == 0:
return None
return self._values[0]
def skip(self, _index: int) -> QueryableABC:
r"""Skips all elements from index
Parameter
---------
_index: :class:`int`
index
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
return type(self)(self.type, self._values[_index:])
def skip_last(self, _index: int) -> QueryableABC:
r"""Skips all elements after index
Parameter
---------
_index: :class:`int`
index
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = self.count() - _index
return type(self)(self._type, self._values[:index])
def sum(self, _func: Callable = None) -> Union[int, float, complex]:
r"""Sum of all values
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
Union[int, float, complex]
"""
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = default_lambda
result = 0
for x in self:
result += _func(x)
return result
def split(self, _func: Callable) -> QueryableABC:
r"""Splits the list by given function
Parameter
---------
func: :class:`Callable`
seperator
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
groups = []
group = []
for x in self:
v = _func(x)
if x == v:
groups.append(group)
group = []
group.append(x)
groups.append(group)
query_groups = []
for g in groups:
if len(g) == 0:
continue
query_groups.append(type(self)(self._type, g))
return type(self)(self._type, query_groups)
def take(self, _index: int) -> QueryableABC:
r"""Takes all elements from index
Parameter
---------
_index: :class:`int`
index
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
return type(self)(self._type, self._values[:_index])
def take_last(self, _index: int) -> QueryableABC:
r"""Takes all elements after index
Parameter
---------
_index: :class:`int`
index
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
index = self.count() - _index
if index >= self.count() or index < 0:
raise IndexOutOfRangeException()
return type(self)(self._type, self._values[index:])
def where(self, _func: Callable = None) -> QueryableABC:
r"""Select element by function
Parameter
---------
func: :class:`Callable`
selected value
Returns
-------
:class: `cpl.query.base.queryable_abc.QueryableABC`
"""
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
if _func is None:
_func = default_lambda
return type(self)(self.type, filter(_func, self))

View File

@@ -1,96 +0,0 @@
from abc import abstractmethod, ABC
from typing import Iterable
class Sequence(ABC):
@abstractmethod
def __init__(self, t: type, values: Iterable = None):
assert t is not None
assert isinstance(t, type) or t == any
assert values is None or isinstance(values, Iterable)
if values is None:
values = []
self._values = list(values)
if t is None:
t = object
self._type = t
def __iter__(self):
return iter(self._values)
def __next__(self):
return next(iter(self._values))
def __len__(self):
return self.to_list().__len__()
@classmethod
def __class_getitem__(cls, _t: type) -> type:
return _t
def __repr__(self):
return f"<{type(self).__name__} {self.to_list().__repr__()}>"
@property
def type(self) -> type:
return self._type
def _check_type(self, __object: any):
if self._type == any:
return
if (
self._type is not None
and type(__object) != self._type
and not isinstance(type(__object), self._type)
and not issubclass(type(__object), self._type)
):
raise Exception(f"Unexpected type: {type(__object)}\nExpected type: {self._type}")
def to_list(self) -> list:
r"""Converts :class: `cpl.query.base.sequence_abc.SequenceABC` to :class: `list`
Returns:
:class: `list`
"""
return [x for x in self._values]
def copy(self) -> "Sequence":
r"""Creates a copy of sequence
Returns:
Sequence
"""
return type(self)(self._type, self.to_list())
@classmethod
def empty(cls) -> "Sequence":
r"""Returns an empty sequence
Returns:
Sequence object that contains no elements
"""
return cls(object, [])
def index_of(self, _object: object) -> int:
r"""Returns the index of given element
Returns:
Index of object
Raises:
IndexError if object not in sequence
"""
for i, o in enumerate(self):
if o == _object:
return i
raise IndexError
@classmethod
def range(cls, start: int, length: int) -> "Sequence":
return cls(int, range(start, length))

View File

@@ -0,0 +1,173 @@
from itertools import islice, groupby
from typing import Generic, Callable, Iterable, Iterator, Dict, Tuple, Optional
from cpl.core.typing import T, R
from cpl.query.list import List
from cpl.query.typing import Predicate, K, Selector
class Collection(Generic[T]):
def __init__(self, source: Iterable[T]):
self._source = source
def __iter__(self) -> Iterator[T]:
return iter(self._source)
@property
def length(self) -> int:
return sum(1 for _ in self._source)
def where(self, f: Predicate) -> "Collection[T]":
return Collection([x for x in self._source if f(x)])
def select(self, f: Selector) -> "Collection[R]":
return Collection([f(x) for x in self._source])
def select_many(self, f: Callable[[T], Iterable[R]]) -> "Collection[R]":
return Collection([y for x in self._source for y in f(x)])
def take(self, count: int) -> "Collection[T]":
return Collection(islice(self._source, count))
def skip(self, count: int) -> "Collection[T]":
return Collection(islice(self._source, count, None))
def take_while(self, f: Predicate) -> "Collection[T]":
result = []
for x in self._source:
if f(x):
result.append(x)
else:
break
return Collection(result)
def skip_while(self, f: Predicate) -> "Collection[T]":
it = iter(self._source)
for x in it:
if not f(x):
return Collection([x] + list(it))
return Collection([])
def distinct(self) -> "Collection[T]":
seen = set()
return Collection([x for x in self._source if not (x in seen or seen.add(x))])
def union(self, other: Iterable[T]) -> "Collection[T]":
return self.concat(other).distinct()
def intersect(self, other: Iterable[T]) -> "Collection[T]":
other_set = set(other)
return Collection([x for x in self._source if x in other_set])
def except_(self, other: Iterable[T]) -> "Collection[T]":
other_set = set(other)
return Collection([x for x in self._source if x not in other_set])
def concat(self, other: Iterable[T]) -> "Collection[T]":
return Collection(self._source) + list(other)
def count(self) -> int:
return len(list(self._source))
def sum(self, f: Optional[Selector] = None) -> R:
return sum([f(x) for x in self._source]) if f else sum(self._source) # type: ignore
def min(self, f: Optional[Selector] = None) -> R:
return min([f(x) for x in self._source]) if f else min(self._source) # type: ignore
def max(self, f: Optional[Selector] = None) -> R:
return max([f(x) for x in self._source]) if f else max(self._source) # type: ignore
def average(self, f: Optional[Callable[[T], float]] = None) -> float:
values = [f(x) for x in self._source] if f else list(self._source)
return sum(values) / len(values) if values else 0.0
def aggregate(self, func: Callable[[R, T], R], seed: Optional[R] = None) -> R:
it = iter(self._source)
if seed is None:
acc = next(it) # type: ignore
else:
acc = seed
for x in it:
acc = func(acc, x)
return acc
def any(self, f: Optional[Predicate] = None) -> bool:
return any(f(x) if f else x for x in self._source)
def all(self, f: Predicate) -> bool:
return all(f(x) for x in self._source)
def contains(self, value: T) -> bool:
return value in self._source
def sequence_equal(self, other: Iterable[T]) -> bool:
return list(self._source) == list(other)
def group_by(self, key_f: Callable[[T], K]) -> "Collection[Tuple[K, List[T]]]":
sorted_data = sorted(self._source, key=key_f)
return Collection([(key, list(group)) for key, group in groupby(sorted_data, key=key_f)])
def join(
self, inner: Iterable[R], outer_key: Callable[[T], K], inner_key: Callable[[R], K], result: Callable[[T, R], R]
) -> "Collection[R]":
lookup: Dict[K, List[R]] = {}
for i in inner:
k = inner_key(i)
lookup.setdefault(k, []).append(i)
return Collection([result(o, i) for o in self._source for i in lookup.get(outer_key(o), [])])
def first(self, f: Optional[Predicate] = None) -> T:
if f:
for x in self._source:
if f(x):
return x
raise ValueError("No matching element")
return next(iter(self._source))
def first_or_default(self, default: Optional[T] = None) -> Optional[T]:
return next(iter(self._source), default)
def last(self) -> T:
return list(self._source)[-1]
def single(self, f: Optional[Predicate] = None) -> T:
items = [x for x in self._source if f(x)] if f else list(self._source)
if len(items) != 1:
raise ValueError("Sequence does not contain exactly one element")
return items[0]
def to_list(self) -> List[T]:
return List(self._source)
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._source)
def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]:
return {key_f(x): value_f(x) for x in self._source}
def cast(self, t: Selector) -> "Collection[R]":
return Collection([t(x) for x in self._source])
def of_type(self, t: type) -> "Collection[T]":
return Collection([x for x in self._source if isinstance(x, t)])
def reverse(self) -> "Collection[T]":
return Collection(reversed(list(self._source)))
def zip(self, other: Iterable[R]) -> "Collection[Tuple[T, R]]":
return Collection(zip(self._source, other))
@staticmethod
def range(start: int, count: int) -> "Collection[int]":
return Collection(range(start, start + count))
@staticmethod
def repeat(value: T, count: int) -> "Collection[T]":
return Collection([value for _ in range(count)])
@staticmethod
def empty() -> "Collection[T]":
return Collection([])

View File

@@ -0,0 +1,207 @@
from itertools import islice, groupby, chain
from typing import Generic, Callable, Iterable, Iterator, Dict, Tuple, Optional
from cpl.core.typing import T, R
from cpl.query.typing import Predicate, K, Selector
class Enumerable(Generic[T]):
def __init__(self, source: Iterable[T]):
self._source = source
def __iter__(self) -> Iterator[T]:
return iter(self._source)
@property
def length(self) -> int:
return len(list(self._source))
def where(self, f: Predicate) -> "Enumerable[T]":
return Enumerable(x for x in self._source if f(x))
def select(self, f: Selector) -> "Enumerable[R]":
return Enumerable(f(x) for x in self._source)
def select_many(self, f: Callable[[T], Iterable[R]]) -> "Enumerable[R]":
return Enumerable(y for x in self._source for y in f(x))
def take(self, count: int) -> "Enumerable[T]":
return Enumerable(islice(self._source, count))
def skip(self, count: int) -> "Enumerable[T]":
return Enumerable(islice(self._source, count, None))
def take_while(self, f: Predicate) -> "Enumerable[T]":
def generator():
for x in self._source:
if f(x):
yield x
else:
break
return Enumerable(generator())
def skip_while(self, f: Predicate) -> "Enumerable[T]":
def generator():
it = iter(self._source)
for x in it:
if not f(x):
yield x
break
yield from it
return Enumerable(generator())
def distinct(self) -> "Enumerable[T]":
def generator():
seen = set()
for x in self._source:
if x not in seen:
seen.add(x)
yield x
return Enumerable(generator())
def union(self, other: Iterable[T]) -> "Enumerable[T]":
return Enumerable(chain(self.distinct(), Enumerable(other).distinct())).distinct()
def intersect(self, other: Iterable[T]) -> "Enumerable[T]":
other_set = set(other)
return Enumerable(x for x in self._source if x in other_set)
def except_(self, other: Iterable[T]) -> "Enumerable[T]":
other_set = set(other)
return Enumerable(x for x in self._source if x not in other_set)
def concat(self, other: Iterable[T]) -> "Enumerable[T]":
return Enumerable(chain(self._source, other))
# --- Aggregation ---
def count(self) -> int:
return sum(1 for _ in self._source)
def sum(self, f: Optional[Selector] = None) -> R:
if f:
return sum(f(x) for x in self._source)
return sum(self._source) # type: ignore
def min(self, f: Optional[Selector] = None) -> R:
if f:
return min(f(x) for x in self._source)
return min(self._source) # type: ignore
def max(self, f: Optional[Selector] = None) -> R:
if f:
return max(f(x) for x in self._source)
return max(self._source) # type: ignore
def average(self, f: Optional[Callable[[T], float]] = None) -> float:
values = list(self.select(f).to_list()) if f else list(self._source)
return sum(values) / len(values) if values else 0.0
def aggregate(self, func: Callable[[R, T], R], seed: Optional[R] = None) -> R:
it = iter(self._source)
if seed is None:
acc = next(it) # type: ignore
else:
acc = seed
for x in it:
acc = func(acc, x)
return acc
def any(self, f: Optional[Predicate] = None) -> bool:
return any(f(x) if f else x for x in self._source)
def all(self, f: Predicate) -> bool:
return all(f(x) for x in self._source)
def contains(self, value: T) -> bool:
return any(x == value for x in self._source)
def sequence_equal(self, other: Iterable[T]) -> bool:
return list(self._source) == list(other)
def group_by(self, key_f: Callable[[T], K]) -> "Enumerable[Tuple[K, List[T]]]":
def generator():
sorted_data = sorted(self._source, key=key_f)
for key, group in groupby(sorted_data, key=key_f):
yield (key, list(group))
return Enumerable(generator())
def join(
self, inner: Iterable[R], outer_key: Callable[[T], K], inner_key: Callable[[R], K], result: Callable[[T, R], R]
) -> "Enumerable[R]":
def generator():
lookup: Dict[K, List[R]] = {}
for i in inner:
k = inner_key(i)
lookup.setdefault(k, []).append(i)
for o in self._source:
k = outer_key(o)
if k in lookup:
for i in lookup[k]:
yield result(o, i)
return Enumerable(generator())
def first(self, f: Optional[Predicate] = None) -> T:
if f:
for x in self._source:
if f(x):
return x
raise ValueError("No matching element")
return next(iter(self._source))
def first_or_default(self, default: Optional[T] = None) -> Optional[T]:
return next(iter(self._source), default)
def last(self) -> T:
return list(self._source)[-1]
def single(self, f: Optional[Predicate] = None) -> T:
items = [x for x in self._source if f(x)] if f else list(self._source)
if len(items) != 1:
raise ValueError("Sequence does not contain exactly one element")
return items[0]
def to_list(self) -> "List[T]":
from cpl.query.list import List
return List(self._source)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._source)
def to_set(self) -> "Set[T]":
from cpl.query.set import Set
return Set(self._source)
def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]:
return {key_f(x): value_f(x) for x in self._source}
def cast(self, t: Selector) -> "Enumerable[R]":
return Enumerable(t(x) for x in self._source)
def of_type(self, t: type) -> "Enumerable[T]":
return Enumerable(x for x in self._source if isinstance(x, t))
def reverse(self) -> "Enumerable[T]":
return Enumerable(reversed(list(self._source)))
def zip(self, other: Iterable[R]) -> "Enumerable[Tuple[T, R]]":
return Enumerable(zip(self._source, other))
@staticmethod
def range(start: int, count: int) -> "Enumerable[int]":
return Enumerable(range(start, start + count))
@staticmethod
def repeat(value: T, count: int) -> "Enumerable[T]":
return Enumerable(value for _ in range(count))
@staticmethod
def empty() -> "Enumerable[T]":
return Enumerable([])

View File

@@ -1,2 +0,0 @@
from .enumerable import Enumerable
from .enumerable_abc import EnumerableABC

View File

@@ -1,12 +0,0 @@
from cpl.query.enumerable.enumerable_abc import EnumerableABC
def _default_lambda(x: object):
return x
class Enumerable(EnumerableABC):
r"""Implementation of :class: `cpl.query.enumerable.enumerable_abc.EnumerableABC`"""
def __init__(self, t: type = None, values: list = None):
EnumerableABC.__init__(self, t, values)

View File

@@ -1,21 +0,0 @@
from abc import abstractmethod
from cpl.query.base.queryable_abc import QueryableABC
class EnumerableABC(QueryableABC):
r"""ABC to define functions on list"""
@abstractmethod
def __init__(self, t: type = None, values: list = None):
QueryableABC.__init__(self, t, values)
def to_iterable(self) -> "IterableABC":
r"""Converts :class: `cpl.query.enumerable.enumerable_abc.EnumerableABC` to :class: `cpl.query.iterable.iterable_abc.IterableABC`
Returns:
:class: `cpl.query.iterable.iterable_abc.IterableABC`
"""
from cpl.query.iterable.iterable import Iterable
return Iterable(self._type, self.to_list())

View File

@@ -1,33 +0,0 @@
from enum import Enum
# models
class ExceptionArgument(Enum):
list = "list"
func = "func"
type = "type"
value = "value"
index = "index"
# exceptions
class ArgumentNoneException(Exception):
r"""Exception when argument is None"""
def __init__(self, arg: ExceptionArgument):
Exception.__init__(self, f"argument {arg} is None")
class IndexOutOfRangeException(Exception):
r"""Exception when index is out of range"""
def __init__(self, err: str = None):
Exception.__init__(self, f"List index out of range" if err is None else err)
class InvalidTypeException(Exception):
r"""Exception when type is invalid"""
class WrongTypeException(Exception):
r"""Exception when type is unexpected"""

View File

@@ -1 +0,0 @@
from .list import List

View File

@@ -1,36 +0,0 @@
from cpl.query.enumerable.enumerable_abc import EnumerableABC
from cpl.query.iterable.iterable import Iterable
class List(Iterable):
r"""Implementation of :class: `cpl.query.extension.iterable.Iterable`"""
def __init__(self, t: type = None, values: Iterable = None):
Iterable.__init__(self, t, values)
def __getitem__(self, *args):
return self._values.__getitem__(*args)
def __setitem__(self, *args):
self._values.__setitem__(*args)
def __delitem__(self, *args):
self._values.__delitem__(*args)
def to_enumerable(self) -> EnumerableABC:
r"""Converts :class: `cpl.query.iterable.iterable_abc.IterableABC` to :class: `cpl.query.enumerable.enumerable_abc.EnumerableABC`
Returns:
:class: `cpl.query.enumerable.enumerable_abc.EnumerableABC`
"""
from cpl.query.enumerable.enumerable import Enumerable
return Enumerable(self._type, self.to_list())
def to_iterable(self) -> Iterable:
r"""Converts :class: `cpl.query.enumerable.enumerable_abc.EnumerableABC` to :class: `cpl.query.iterable.iterable_abc.IterableABC`
Returns:
:class: `cpl.query.iterable.iterable_abc.IterableABC`
"""
return Iterable(self._type, self.to_list())

View File

@@ -0,0 +1,45 @@
from typing import Generic, Iterable, Iterator, Optional
from cpl.core.typing import T
from cpl.query.enumerable import Enumerable
class ImmutableList(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
if source is None:
source = []
Enumerable.__init__(self, source)
@property
def _items(self) -> list[T]:
return list(self._source)
def __iter__(self) -> Iterator[T]:
return iter(self._items)
def __len__(self) -> int:
return len(self._items)
def __getitem__(self, index: int) -> T:
return self._items[index]
def __contains__(self, item: T) -> bool:
return item in self._items
def __repr__(self) -> str:
return f"List({self._items!r})"
@property
def length(self) -> int:
return len(self._items)
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._items)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._items)

View File

@@ -0,0 +1,44 @@
from typing import Generic, Iterable, Iterator, Optional
from cpl.core.typing import T
from cpl.query.enumerable import Enumerable
class ImmutableSet(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
if source is None:
source = set()
self.__source = source
Enumerable.__init__(self, [])
@property
def _source(self) -> set[T]:
return self.__source
@_source.setter
def _source(self, value: set[T]) -> None:
if not isinstance(value, set):
value = set(value)
self.__source = value
def __iter__(self) -> Iterator[T]:
return iter(self._source)
def __len__(self) -> int:
return len(self._source)
def __contains__(self, item: T) -> bool:
return item in self._source
def __repr__(self) -> str:
return f"Set({self._source!r})"
@property
def length(self) -> int:
return len(self._source)
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._source)

View File

@@ -1,2 +0,0 @@
from .iterable_abc import IterableABC
from .iterable import Iterable

View File

@@ -1,8 +0,0 @@
from typing import Iterable as TIterable
from cpl.query.iterable.iterable_abc import IterableABC
class Iterable(IterableABC):
def __init__(self, t: type = None, values: TIterable = None):
IterableABC.__init__(self, t, values)

View File

@@ -1,71 +0,0 @@
from abc import abstractmethod
from typing import Iterable
from cpl.query.base.queryable_abc import QueryableABC
class IterableABC(QueryableABC):
r"""ABC to define functions on list"""
@abstractmethod
def __init__(self, t: type = None, values: Iterable = None):
QueryableABC.__init__(self, t, values)
def __str__(self):
return str(self.to_list())
def append(self, _object: object):
self.add(_object)
def add(self, _object: object):
r"""Adds element to list
Parameter:
_object: :class:`object`
value
"""
self._check_type(_object)
self._values.append(_object)
def extend(self, __iterable: Iterable) -> "IterableABC":
r"""Adds elements of given list to list
Parameter:
__iterable: :class: `cpl.query.extension.iterable.Iterable`
index
"""
for value in __iterable:
self.append(value)
return self
def remove(self, _object: object):
r"""Removes element from list
Parameter:
_object: :class:`object`
value
"""
if _object not in self:
raise ValueError
self._values.remove(_object)
def remove_at(self, _index: int):
r"""Removes element from list
Parameter:
_object: :class:`object`
value
"""
self._values.pop(_index)
def to_enumerable(self) -> "EnumerableABC":
r"""Converts :class: `cpl.query.iterable.iterable_abc.IterableABC` to :class: `cpl.query.enumerable.enumerable_abc.EnumerableABC`
Returns:
:class: `cpl.query.enumerable.enumerable_abc.EnumerableABC`
"""
from cpl.query.enumerable.enumerable import Enumerable
return Enumerable(self._type, self.to_list())

View File

@@ -0,0 +1,66 @@
from typing import Generic, Iterable, Iterator, Optional
from cpl.core.typing import T
from cpl.query.enumerable import Enumerable
class List(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
if source is None:
source = []
Enumerable.__init__(self, source)
@property
def _items(self) -> list[T]:
return list(self._source)
def __iter__(self) -> Iterator[T]:
return iter(self._items)
def __len__(self) -> int:
return len(self._items)
def __getitem__(self, index: int) -> T:
return self._items[index]
def __setitem__(self, index: int, value: T) -> None:
self._items[index] = value
def __contains__(self, item: T) -> bool:
return item in self._items
def __repr__(self) -> str:
return f"List({self._items!r})"
@property
def length(self) -> int:
return len(self._items)
def add(self, item: T) -> None:
self._items.append(item)
def extend(self, items: Iterable[T]) -> None:
self._items.extend(items)
def insert(self, index: int, item: T) -> None:
self._items.insert(index, item)
def remove(self, item: T) -> None:
self._items.remove(item)
def pop(self, index: int = -1) -> T:
return self._items.pop(index)
def clear(self) -> None:
self._items.clear()
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._items)
def to_collection(self) -> "Collection[T]":
from cpl.query.collection import Collection
return Collection(self._items)

View File

@@ -0,0 +1,59 @@
from typing import Protocol, Callable, Dict, Tuple, Optional, Iterable
from cpl.core.typing import T, R
from cpl.query.list import List
from cpl.query.typing import Selector, Predicate, K
class Sequence(Protocol[T]):
def select(self, f: Selector) -> "Sequence[R]": ...
def where(self, f: Predicate) -> "Sequence[T]": ...
def select_many(self, f: Callable[[T], Iterable[R]]) -> "Sequence[R]": ...
def take(self, count: int) -> "Sequence[T]": ...
def skip(self, count: int) -> "Sequence[T]": ...
def take_while(self, f: Predicate) -> "Sequence[T]": ...
def skip_while(self, f: Predicate) -> "Sequence[T]": ...
def distinct(self) -> "Sequence[T]": ...
def union(self, other: Iterable[T]) -> "Sequence[T]": ...
def intersect(self, other: Iterable[T]) -> "Sequence[T]": ...
def except_(self, other: Iterable[T]) -> "Sequence[T]": ...
def concat(self, other: Iterable[T]) -> "Sequence[T]": ...
def count(self) -> int: ...
def sum(self, f: Optional[Selector] = None) -> R: ...
def min(self, f: Optional[Selector] = None) -> R: ...
def max(self, f: Optional[Selector] = None) -> R: ...
def average(self, f: Optional[Callable[[T], float]] = None) -> float: ...
def aggregate(self, func: Callable[[R, T], R], seed: Optional[R] = None) -> R: ...
def any(self, f: Optional[Predicate] = None) -> bool: ...
def all(self, f: Predicate) -> bool: ...
def contains(self, value: T) -> bool: ...
def sequence_equal(self, other: Iterable[T]) -> bool: ...
def group_by(self, key_f: Callable[[T], K]) -> "Sequence[Tuple[K, List[T]]]": ...
def join(
self, inner: Iterable[R], outer_key: Callable[[T], K], inner_key: Callable[[R], K], result: Callable[[T, R], R]
) -> "Sequence[R]": ...
def first(self, f: Optional[Predicate] = None) -> T: ...
def first_or_default(self, default: Optional[T] = None) -> Optional[T]: ...
def last(self) -> T: ...
def single(self, f: Optional[Predicate] = None) -> T: ...
def to_list(self) -> List[T]: ...
def to_dict(self, key_f: Callable[[T], K], value_f: Selector) -> Dict[K, R]: ...
def cast(self, t: Selector) -> "Sequence[R]": ...
def of_type(self, t: type) -> "Sequence[T]": ...
def reverse(self) -> "Sequence[T]": ...
def zip(self, other: Iterable[R]) -> "Sequence[Tuple[T, R]]": ...
@staticmethod
def range(start: int, count: int) -> "Sequence[int]": ...
@staticmethod
def repeat(value: T, count: int) -> "Sequence[T]": ...
@staticmethod
def empty() -> "Sequence[T]": ...

View File

@@ -0,0 +1,53 @@
from typing import Generic, Iterable, Iterator, Optional
from cpl.core.typing import T
from cpl.query.enumerable import Enumerable
class Set(Generic[T], Enumerable[T]):
def __init__(self, source: Optional[Iterable[T]] = None):
if source is None:
source = set()
self.__source = source
Enumerable.__init__(self, [])
@property
def _source(self) -> set[T]:
return self.__source
@_source.setter
def _source(self, value: set[T]) -> None:
if not isinstance(value, set):
value = set(value)
self.__source = value
def __iter__(self) -> Iterator[T]:
return iter(self._source)
def __len__(self) -> int:
return len(self._source)
def __contains__(self, item: T) -> bool:
return item in self._source
def __repr__(self) -> str:
return f"Set({self._source!r})"
@property
def length(self) -> int:
return len(self._source)
def add(self, item: T) -> None:
self._source.add(item)
def remove(self, item: T) -> None:
self._source.remove(item)
def clear(self) -> None:
self._source.clear()
def to_enumerable(self) -> "Enumerable[T]":
from cpl.query.enumerable import Enumerable
return Enumerable(self._source)

View File

@@ -0,0 +1,8 @@
from typing import Callable, TypeVar
from cpl.core.typing import T, R
K = TypeVar("K")
Predicate = Callable[[T], bool]
Selector = Callable[[T], R]

View File

@@ -5,7 +5,7 @@ from random import randint
from cpl.core.utils import String from cpl.core.utils import String
from cpl.query.exceptions import InvalidTypeException, ArgumentNoneException from cpl.query.exceptions import InvalidTypeException, ArgumentNoneException
from cpl.query.extension.list import List from cpl.query.extension.list import List
from cpl.query.iterable import Iterable from cpl.query.collection import Iterable
from unittests_query.models import User, Address from unittests_query.models import User, Address

View File

@@ -3,7 +3,7 @@ import timeit
import unittest import unittest
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
from cpl.query.iterable import Iterable from cpl.query.collection import Iterable
VALUES = 10000 VALUES = 10000
COUNT = 50 COUNT = 50

View File

@@ -2,7 +2,7 @@ import unittest
from cpl.query.enumerable import Enumerable from cpl.query.enumerable import Enumerable
from cpl.query.extension.list import List from cpl.query.extension.list import List
from cpl.query.iterable import Iterable from cpl.query.collection import Iterable
class SequenceTestCase(unittest.TestCase): class SequenceTestCase(unittest.TestCase):