2022.10.9 - Enumerable & List (#105) #106

Merged
edraft merged 9 commits from 2022.10.9 into 2022.10 2022-09-15 18:21:25 +02:00
21 changed files with 1339 additions and 704 deletions
Showing only changes of commit 70652aeb4c - Show all commits

View File

@ -5,7 +5,7 @@ from typing import Optional, Callable, Union
class QueryableABC(ABC):
@abstractmethod
def all(self, func: Callable) -> bool:
def all(self, _func: Callable) -> bool:
r"""Checks if every element of list equals result found by function
Parameter
@ -20,7 +20,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def any(self, func: Callable) -> bool:
def any(self, _func: Callable) -> bool:
r"""Checks if list contains result found by function
Parameter
@ -35,7 +35,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def average(self, func: Callable = None) -> Union[int, float, complex]:
def average(self, _func: Callable) -> Union[int, float, complex]:
r"""Returns average value of list
Parameter
@ -65,7 +65,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def count(self, func: Callable = None) -> int:
def count(self, _func: Callable = None) -> int:
r"""Returns length of list or count of found elements
Parameter
@ -80,7 +80,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def distinct(self, func: Callable = None) -> 'QueryableABC':
def distinct(self, _func: Callable = None) -> 'QueryableABC':
r"""Returns list without redundancies
Parameter
@ -124,26 +124,6 @@ class QueryableABC(ABC):
"""
pass
@abstractmethod
def last(self) -> any:
r"""Returns last element
Returns
-------
Last element of list: any
"""
pass
@abstractmethod
def last_or_default(self) -> any:
r"""Returns last element or None
Returns
-------
Last element of list: Optional[any]
"""
pass
@abstractmethod
def first(self) -> any:
r"""Returns first element
@ -165,7 +145,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def for_each(self, func: Callable):
def for_each(self, _func: Callable):
r"""Runs given function for each element of list
Parameter
@ -176,8 +156,27 @@ class QueryableABC(ABC):
pass
@abstractmethod
def max(self, func: Callable = None) -> Union[int, float, complex]:
r"""Returns highest value
def last(self) -> any:
r"""Returns last element
Returns
-------
Last element of list: any
"""
pass
@abstractmethod
def last_or_default(self) -> any:
r"""Returns last element or None
Returns
-------
Last element of list: Optional[any]
"""
pass
@abstractmethod
def max(self, _func: Callable) -> Union[int, float, complex]:
r"""Returns the highest value
Parameter
---------
@ -191,8 +190,19 @@ class QueryableABC(ABC):
pass
@abstractmethod
def min(self, func: Callable = None) -> Union[int, float, complex]:
r"""Returns highest value
def median(self) -> Union[int, float]:
r"""Return the median value of data elements
Returns
-------
Union[int, float]
"""
pass
@abstractmethod
def min(self, _func: Callable) -> Union[int, float, complex]:
r"""Returns the lowest value
Parameter
---------
@ -206,7 +216,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def order_by(self, func: Callable) -> 'QueryableABC':
def order_by(self, _func: Callable) -> 'QueryableABC':
r"""Sorts elements by function in ascending order
Parameter
@ -221,7 +231,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def order_by_descending(self, func: Callable) -> 'QueryableABC':
def order_by_descending(self, _func: Callable) -> 'QueryableABC':
r"""Sorts elements by function in descending order
Parameter
@ -321,7 +331,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def sum(self, func: Callable = None) -> Union[int, float, complex]:
def sum(self, _func: Callable) -> Union[int, float, complex]:
r"""Sum of all values
Parameter
@ -366,7 +376,7 @@ class QueryableABC(ABC):
pass
@abstractmethod
def where(self, func: Callable) -> 'QueryableABC':
def where(self, _func: Callable) -> 'QueryableABC':
r"""Select element by function
Parameter

View File

@ -0,0 +1,50 @@
from abc import abstractmethod, ABC
from typing import Union
from cpl_query.base.sequence_values import SequenceValues
class SequenceABC(ABC):
@abstractmethod
def __init__(self, t: type = None, values: Union[list, iter] = None):
ABC.__init__(self)
if t == any:
t = None
elif t is None and values is not None:
t = type(values[0])
self._type = t
self._values = SequenceValues(values, t)
def __len__(self):
return len(self._values)
def __iter__(self):
return iter(self._values)
def next(self):
return next(self._values)
def __next__(self):
return self.next()
def __repr__(self):
return f'<{type(self).__name__} {list(self).__repr__()}>'
@property
def type(self) -> type:
return self._type
def reset(self):
self._values.reset()
def to_list(self) -> list:
r"""Converts :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC` to :class: `list`
Returns
-------
:class: `list`
"""
return [x for x in self]

View File

@ -0,0 +1,55 @@
import io
import itertools
from cpl_query.exceptions import IndexOutOfRangeException
class SequenceEnd:
def __init__(self):
self.is_ended = False
def set_end(self, value: bool) -> 'SequenceEnd':
self.is_ended = value
return self
class SequenceValues:
def __init__(self, data, _t: type):
if data is None:
data = []
if not hasattr(data, '__iter__'):
raise TypeError(f'{type(self).__name__} must be instantiated with an iterable object')
self._data = []
for element in data:
if _t is not None and type(element) != _t and not isinstance(type(element), _t) and not issubclass(type(element), _t):
raise Exception(f'Unexpected type: {type(element)}\nExpected type: {_t}')
self._data.append(element)
self._index = 0
self._len = sum(1 for item in self._data)
self._cycle = itertools.cycle(self._data)
def __len__(self):
return self._len
def __iter__(self):
i = 0
while i < len(self):
yield next(self._cycle)
i += 1
def __next__(self):
if self._index >= len(self):
raise IndexOutOfRangeException()
self._index += 1
return self.next()
def next(self):
return next(self._cycle)
def reset(self):
self._index = 0
self._cycle = itertools.cycle(self._data)

View File

@ -4,7 +4,7 @@
"Version": {
"Major": "2022",
"Minor": "10",
"Micro": "2"
"Micro": "9"
},
"Author": "Sven Heidemann",
"AuthorEmail": "sven.heidemann@sh-edraft.de",

View File

@ -0,0 +1,333 @@
from typing import Union, Callable, Optional, Iterable
from cpl_query._helper import is_number
from cpl_query.base.sequence_values import SequenceValues
from cpl_query.enumerable.enumerable_abc import EnumerableABC
from cpl_query.enumerable.ordered_enumerable_abc import OrderedEnumerableABC
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument, InvalidTypeException, IndexOutOfRangeException
def _default_lambda(x: object):
return x
class Enumerable(EnumerableABC):
r"""Implementation of :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC`
"""
def __init__(self, t: type = None, values: Union[list, iter] = None):
EnumerableABC.__init__(self, t, values)
def all(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = self.where(_func)
return len(result) == len(self)
def any(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = self.where(_func)
return len(result) > 0
def average(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return float(self.sum(_func)) / float(self.count())
def contains(self, _value: object) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _value is None:
raise ArgumentNoneException(ExceptionArgument.value)
return _value in self
def count(self, _func: Callable = None) -> int:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
return len(self)
return len(self.where(_func))
def distinct(self, _func: Callable = None) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = Enumerable()
known_values = []
for element in self:
value = _func(element)
if value in known_values:
continue
known_values.append(value)
result.add(element)
return result
def element_at(self, _index: int) -> any:
self.reset()
while _index >= 0:
current = self.next()
if _index == 0:
return current
_index -= 1
def element_at_or_default(self, _index: int) -> any:
try:
return self.element_at(_index)
except IndexOutOfRangeException:
return None
@staticmethod
def empty() -> 'EnumerableABC':
r"""Returns an empty enumerable
Returns
-------
Enumerable object that contains no elements
"""
return Enumerable()
def first(self: EnumerableABC, _func=None) -> any:
if _func is not None:
return self.where(_func).element_at(0)
return self.element_at(0)
def first_or_default(self: EnumerableABC, _func=None) -> Optional[any]:
if _func is not None:
return self.where(_func).element_at_or_default(0)
return self.element_at_or_default(0)
def for_each(self, _func: Callable = None):
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
for element in self:
_func(element)
def last(self: EnumerableABC) -> any:
return self.element_at(self.count() - 1)
def last_or_default(self: EnumerableABC) -> Optional[any]:
return self.element_at_or_default(self.count() - 1)
def max(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(max(self, key=_func))
def median(self, _func=None) -> Union[int, float]:
if _func is None:
_func = _default_lambda
result = self.order_by(_func).select(_func).to_list()
length = len(result)
i = int(length / 2)
return (
result[i]
if length % 2 == 1
else (float(result[i - 1]) + float(result[i])) / float(2)
)
def min(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(min(self, key=_func))
def order_by(self, _func: Callable = None) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
from cpl_query.enumerable.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self.type, _func, sorted(self, key=_func))
def order_by_descending(self, _func: Callable = None) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
from cpl_query.enumerable.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self.type, _func, sorted(self, key=_func, reverse=True))
@staticmethod
def range(start: int, length: int) -> 'EnumerableABC':
return Enumerable(int, range(start, start + length, 1))
def reverse(self: EnumerableABC) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
return Enumerable(self.type, list(reversed(self.to_list())))
def select(self, _func: Callable = None) -> EnumerableABC:
if _func is None:
_func = _default_lambda
result = Enumerable()
result.extend(_func(_o) for _o in self)
return result
def select_many(self, _func: Callable = None) -> EnumerableABC:
if _func is None:
_func = _default_lambda
result = Enumerable()
# The line below is pain. I don't understand anything of it...
# written on 09.11.2022 by Sven Heidemann
elements = [_a for _o in self for _a in _func(_o)]
result.extend(elements)
return result
def single(self: EnumerableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) > 1:
raise IndexError('Found more than one element')
elif len(self) == 0:
raise IndexOutOfRangeException(f'{type(self).__name__} is empty')
return self.element_at(0)
def single_or_default(self: EnumerableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) > 1:
raise IndexError('Found more than one element')
elif len(self) == 0:
return None
return self.element_at(0)
def skip(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
_list = self.to_list()
if _index >= len(_list):
raise IndexOutOfRangeException()
return Enumerable(self.type, _list[_index:])
def skip_last(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(self) - _index
if index >= len(self) or index < 0:
raise IndexOutOfRangeException()
return self.take(len(self) - _index)
def take(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
_list = self.to_list()
if _index >= len(_list):
raise IndexOutOfRangeException()
return Enumerable(self.type, _list[:_index])
def take_last(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
_list = self.to_list()
index = len(_list) - _index
if index >= len(_list) or index < 0:
raise IndexOutOfRangeException()
return self.skip(index)
def sum(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return sum([_func(x) for x in self])
def where(self, _func: Callable = None) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
if _func is None:
_func = _default_lambda
result = Enumerable(self.type)
for element in self:
if _func(element):
result.add(element)
return result

View File

@ -1,19 +1,18 @@
import itertools
from abc import abstractmethod
from typing import Union
from typing import Iterable
from cpl_query.enumerable.enumerable_values import EnumerableValues
from cpl_query.base.queryable_abc import QueryableABC
from cpl_query.base.sequence_abc import SequenceABC
from cpl_query.base.sequence_values import SequenceValues
class EnumerableABC:
class EnumerableABC(SequenceABC, QueryableABC):
r"""ABC to define functions on list
"""
@abstractmethod
def __init__(self, t: type = None, values: Union[list, iter] = None):
if t == any:
t = None
self._type = t
self._values = EnumerableValues(values)
def __init__(self, t: type = None, values: Iterable = None):
SequenceABC.__init__(self, t, values)
self._remove_error_check = True
def set_remove_error_check(self, _value: bool):
@ -21,44 +20,6 @@ class EnumerableABC:
"""
self._remove_error_check = _value
def __len__(self):
return len(self._values)
def __iter__(self):
return iter(self._values)
def next(self):
return next(self._values)
def __next__(self):
return self.next()
def __getitem__(self, n) -> object:
r"""Gets item in enumerable at specified zero-based index
Parameter
--------
n: the index of the item to get
Returns
-------
The element at the specified index.
Raises
------
IndexError if n > number of elements in the iterable
"""
for i, e in enumerate(self):
if i == n:
return e
def __repr__(self):
return f'<EnumerableABC {list(self).__repr__()}>'
@property
def type(self) -> type:
return self._type
def add(self, __object: object) -> None:
r"""Adds an element to the enumerable.
"""
@ -68,16 +29,7 @@ class EnumerableABC:
if len(self) == 0 and self._type is None:
self._type = type(__object)
self._values = EnumerableValues([*self._values, __object])
def count(self) -> int:
r"""Returns count of elements
Returns
-------
int
"""
return sum(1 for element in self)
self._values = SequenceValues([*self._values, __object], self._type)
def clear(self):
r"""Removes all elements
@ -85,19 +37,16 @@ class EnumerableABC:
del self._values
self._values = []
@staticmethod
def empty():
r"""Returns an empty enumerable
def extend(self, __list: Iterable) -> 'EnumerableABC':
r"""Adds elements of given list to enumerable
Returns
-------
Enumerable object that contains no elements
Parameter
---------
__enumerable: :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC`
index
"""
return EnumerableABC()
@staticmethod
def range(start: int, length: int) -> 'EnumerableABC':
return EnumerableABC(int, range(start, start + length, 1))
self._values = SequenceValues([*self._values, *__list], self._type)
return self
def remove(self, __object: object) -> None:
r"""Removes element from list
@ -115,13 +64,4 @@ class EnumerableABC:
raise Exception('Element not found')
# self._values.remove(__object)
self._values = EnumerableValues([x for x in self.to_list() if x != __object])
def to_list(self) -> list:
r"""Converts :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC` to :class: `list`
Returns
-------
:class: `list`
"""
return [x for x in self]
self._values = SequenceValues([x for x in self.to_list() if x != __object], self._type)

View File

@ -1,31 +0,0 @@
import io
import itertools
class EnumerableValues:
def __init__(self, data):
if data is None:
data = []
if not hasattr(data, '__iter__'):
raise TypeError('RepeatableIterable must be instantiated with an iterable object')
is_generator = hasattr(data, 'gi_running') or isinstance(data, io.TextIOBase)
self._data = data if not is_generator else [i for i in data]
self._len = sum(1 for item in self._data)
self.cycle = itertools.cycle(self._data)
def __len__(self):
return self._len
def __iter__(self):
i = 0
while i < len(self):
yield next(self.cycle)
i += 1
def __next__(self):
return self.next()
def next(self):
return next(self.cycle)

View File

@ -0,0 +1,36 @@
from collections.abc import Callable
from typing import Iterable
from cpl_query.enumerable.enumerable import Enumerable
from cpl_query.enumerable.ordered_enumerable_abc import OrderedEnumerableABC
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument
class OrderedEnumerable(Enumerable, OrderedEnumerableABC):
r"""Implementation of :class: `cpl_query.extension.Enumerable` `cpl_query.extension.OrderedEnumerableABC`
"""
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None):
Enumerable.__init__(self, _t)
OrderedEnumerableABC.__init__(self, _t, _func, _values)
def then_by(self: OrderedEnumerableABC, _func: Callable) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedEnumerable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs]))
def then_by_descending(self: OrderedEnumerableABC, _func: Callable) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedEnumerable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs], reverse=True))

View File

@ -0,0 +1,43 @@
from abc import abstractmethod
from collections.abc import Callable
from typing import Iterable
from cpl_query.enumerable.enumerable_abc import EnumerableABC
class OrderedEnumerableABC(EnumerableABC):
@abstractmethod
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None):
EnumerableABC.__init__(self, _t, _values)
self._funcs: list[Callable] = []
if _func is not None:
self._funcs.append(_func)
@abstractmethod
def then_by(self, func: Callable) -> 'OrderedEnumerableABC':
r"""Sorts OrderedList in ascending order by function
Parameter
---------
func: :class:`Callable`
Returns
-------
list of :class:`cpl_query.extension.OrderedEnumerableABC`
"""
pass
@abstractmethod
def then_by_descending(self, func: Callable) -> 'OrderedEnumerableABC':
r"""Sorts OrderedList in descending order by function
Parameter
---------
func: :class:`Callable`
Returns
-------
list of :class:`cpl_query.extension.OrderedEnumerableABC`
"""
pass

View File

@ -23,8 +23,8 @@ class IndexOutOfRangeException(Exception):
r"""Exception when index is out of range
"""
def __init__(self):
Exception.__init__(self, f'List index out of range')
def __init__(self, err: str = None):
Exception.__init__(self, f'List index out of range' if err is None else err)
class InvalidTypeException(Exception):

View File

@ -1,9 +0,0 @@
from cpl_query.enumerable.enumerable_abc import EnumerableABC
class LazyList(EnumerableABC):
r"""Implementation of :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC`
"""
def __init__(self, t: type = None, values: list = None):
EnumerableABC.__init__(self, t, values)

View File

@ -1,8 +1,13 @@
from typing import Callable, Optional, Union
from cpl_query._helper import is_number
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument, InvalidTypeException, IndexOutOfRangeException
from cpl_query.iterable.iterable_abc import IterableABC
from cpl_query.iterable.ordered_iterable_abc import OrderedIterableABC
from cpl_query.query import Query
def _default_lambda(x: object):
return x
class Iterable(IterableABC):
@ -10,97 +15,329 @@ class Iterable(IterableABC):
def __init__(self, t: type = None, values: list = None):
IterableABC.__init__(self, t, values)
def any(self, func: Callable) -> bool:
return Query.any(self, func)
def all(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
def all(self, func: Callable) -> bool:
return Query.all(self, func)
if _func is None:
_func = _default_lambda
def average(self, func: Callable = None) -> Union[int, float, complex]:
return Query.avg(self, func)
result = self.where(_func)
return len(result) == len(self)
def contains(self, value: object) -> bool:
return Query.contains(self, value)
def any(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
def count(self, func: Callable = None) -> int:
return Query.count(self, func)
if _func is None:
_func = _default_lambda
def distinct(self, func: Callable = None) -> IterableABC:
return self.__to_self(Query.distinct(self, func))
result = self.where(_func)
return len(result) > 0
def element_at(self, index: int) -> any:
return Query.element_at(self, index)
def average(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
def element_at_or_default(self, index: int) -> Optional[any]:
return Query.element_at_or_default(self, index)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
def last(self) -> any:
return Query.last(self)
if _func is None:
_func = _default_lambda
def last_or_default(self) -> Optional[any]:
return Query.last_or_default(self)
return float(self.sum(_func)) / float(self.count())
def first(self) -> any:
return Query.first(self)
def contains(self, _value: object) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
def first_or_default(self) -> Optional[any]:
return Query.first_or_default(self)
if _value is None:
raise ArgumentNoneException(ExceptionArgument.value)
def for_each(self, func: Callable):
Query.for_each(self, func)
return _value in self
def max(self, func: Callable = None) -> Union[int, float, complex]:
return Query.max(self, func)
def count(self, _func: Callable = None) -> int:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
def min(self, func: Callable = None) -> Union[int, float, complex]:
return Query.min(self, func)
if _func is None:
return len(self)
return len(self.where(_func))
def distinct(self, _func: Callable = None) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = Iterable()
known_values = []
for element in self:
value = _func(element)
if value in known_values:
continue
known_values.append(value)
result.append(element)
return result
def element_at(self, _index: int) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
return self[_index]
def element_at_or_default(self, _index: int) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
try:
return self[_index]
except IndexError:
return None
def first(self: IterableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
raise IndexOutOfRangeException()
return self[0]
def first_or_default(self: IterableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
return None
return self[0]
def last(self: IterableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
raise IndexOutOfRangeException()
return self[len(self) - 1]
def last_or_default(self: IterableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
return None
return self[len(self) - 1]
def for_each(self, _func: Callable = None):
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
for element in self:
_func(element)
def max(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(max(self, key=_func))
def median(self, _func=None) -> Union[int, float]:
if _func is None:
_func = _default_lambda
result = self.order_by(_func).select(_func).to_list()
length = len(result)
i = int(length / 2)
return (
result[i]
if length % 2 == 1
else (float(result[i - 1]) + float(result[i])) / float(2)
)
def min(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(min(self, key=_func))
def order_by(self, _func: Callable = None) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
def order_by(self, func: Callable) -> OrderedIterableABC:
res = Query.order_by(self, func)
from cpl_query.iterable.ordered_iterable import OrderedIterable
res.__class__ = OrderedIterable
return res
return OrderedIterable(self.type, _func, sorted(self, key=_func))
def order_by_descending(self, _func: Callable = None) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
def order_by_descending(self, func: Callable) -> OrderedIterableABC:
res = Query.order_by_descending(self, func)
from cpl_query.iterable.ordered_iterable import OrderedIterable
res.__class__ = OrderedIterable
return res
return OrderedIterable(self.type, _func, sorted(self, key=_func, reverse=True))
def reverse(self) -> IterableABC:
return Query.reverse(self)
def reverse(self: IterableABC) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
def single(self) -> any:
return Query.single(self)
return Iterable().extend(reversed(self.to_list()))
def single_or_default(self) -> Optional[any]:
return Query.single_or_default(self)
def select(self, _func: Callable = None) -> IterableABC:
if _func is None:
_func = _default_lambda
def select(self, _f: Callable) -> IterableABC:
return self.__to_self(Query.select(self, _f))
result = Iterable()
result.extend(_func(_o) for _o in self)
return result
def select_many(self, _f: Callable) -> IterableABC:
return self.__to_self(Query.select_many(self, _f))
def select_many(self, _func: Callable = None) -> IterableABC:
if _func is None:
_func = _default_lambda
def skip(self, index: int) -> IterableABC:
return self.__to_self(Query.skip(self, index))
result = Iterable()
# The line below is pain. I don't understand anything of it...
# written on 09.11.2022 by Sven Heidemann
elements = [_a for _o in self for _a in _func(_o)]
def skip_last(self, index: int) -> IterableABC:
return self.__to_self(Query.skip_last(self, index))
result.extend(elements)
return result
def sum(self, func: Callable = None) -> Union[int, float, complex]:
return Query.sum(self, func)
def single(self: IterableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
def take(self, index: int) -> IterableABC:
return self.__to_self(Query.take(self, index))
if len(self) > 1:
raise Exception('Found more than one element')
elif len(self) == 0:
raise Exception('Found no element')
def take_last(self, index: int) -> IterableABC:
return self.__to_self(Query.take_last(self, index))
return self[0]
def where(self, func: Callable) -> IterableABC:
return self.__to_self(Query.where(self, func))
def single_or_default(self: IterableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
@staticmethod
def __to_self(obj: IterableABC) -> IterableABC:
obj.__class__ = Iterable
return obj
if len(self) > 1:
raise Exception('Index out of range')
elif len(self) == 0:
return None
return self[0]
def skip(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
if _index >= len(self):
raise IndexOutOfRangeException()
return Iterable(self.type, values=self[_index:])
def skip_last(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(self) - _index
if index >= len(self) or index < 0:
raise IndexOutOfRangeException()
result = Iterable()
result.extend(self[:index])
return result
def take(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
if _index >= len(self):
raise IndexOutOfRangeException()
result = Iterable()
result.extend(self[:_index])
return result
def take_last(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(self) - _index
if index >= len(self) or index < 0:
raise IndexOutOfRangeException()
result = Iterable()
result.extend(self[index:])
return result
def sum(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return sum([_func(x) for x in self])
def where(self, _func: Callable = None) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
if _func is None:
_func = _default_lambda
result = Iterable(self.type)
for element in self:
if _func(element):
result.append(element)
return result

View File

@ -1,33 +1,38 @@
from abc import ABC, abstractmethod
from typing import Optional, Callable, Union, Iterable
from abc import abstractmethod
from typing import Iterable
from cpl_query.base.queryable_abc import QueryableABC
from cpl_query.enumerable.enumerable_abc import EnumerableABC
from cpl_query.base.sequence_abc import SequenceABC
from cpl_query.base.sequence_values import SequenceValues
class IterableABC(QueryableABC, list):
class IterableABC(SequenceABC, QueryableABC):
r"""ABC to define functions on list
"""
@abstractmethod
def __init__(self, t: type = None, values: list = None):
list.__init__(self)
SequenceABC.__init__(self, t, values)
if t == any:
t = None
self._type = t
def __getitem__(self, n) -> object:
r"""Gets item in enumerable at specified zero-based index
if values is not None:
for value in values:
self.append(value)
Parameter
--------
n: the index of the item to get
@property
def type(self) -> type:
return self._type
Returns
-------
The element at the specified index.
Raises
------
IndexError if n > number of elements in the iterable
"""
return list.__getitem__(self.to_list(), n)
def append(self, __object: object) -> None:
r"""Adds element to list
Parameter
---------
__object: :class:`object`
@ -39,11 +44,10 @@ class IterableABC(QueryableABC, list):
if len(self) == 0 and self._type is None:
self._type = type(__object)
super().append(__object)
self._values = SequenceValues([*self._values, __object], self._type)
def extend(self, __iterable: Iterable) -> 'IterableABC':
r"""Adds elements of given list to list
Parameter
---------
__iterable: :class: `cpl_query.extension.iterable.Iterable`
@ -53,12 +57,3 @@ class IterableABC(QueryableABC, list):
self.append(value)
return self
def to_list(self) -> list:
r"""Converts :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC` to :class: `list`
Returns
-------
:class: `list`
"""
return list(self)

View File

@ -1,22 +1,35 @@
from collections.abc import Callable
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument
from cpl_query.iterable.iterable import Iterable
from cpl_query.iterable.ordered_iterable_abc import OrderedIterableABC
from cpl_query.query import Query
class OrderedIterable(Iterable, OrderedIterableABC):
r"""Implementation of :class: `cpl_query.extension.Iterable` `cpl_query.extension.OrderedIterableABC`
"""
def __init__(self, _t: type = None):
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None):
Iterable.__init__(self, _t)
OrderedIterableABC.__init__(self, _t)
OrderedIterableABC.__init__(self, _t, _func, _values)
def then_by(self, _func: Callable) -> OrderedIterableABC:
self._funcs.append(_func)
return Query.then_by(self, lambda *args: [f(*args) for f in self._funcs])
def then_by(self: OrderedIterableABC, _func: Callable) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
def then_by_descending(self, _func: Callable) -> OrderedIterableABC:
self._funcs.append(_func)
return Query.then_by_descending(self, lambda *args: [f(*args) for f in self._funcs])
return OrderedIterable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs]))
def then_by_descending(self: OrderedIterableABC, _func: Callable) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedIterable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs], reverse=True))

View File

@ -1,5 +1,6 @@
from abc import abstractmethod
from collections.abc import Callable
from typing import Iterable
from cpl_query.iterable.iterable_abc import IterableABC
@ -7,8 +8,8 @@ from cpl_query.iterable.iterable_abc import IterableABC
class OrderedIterableABC(IterableABC):
@abstractmethod
def __init__(self, _t: type, _func: Callable = None):
IterableABC.__init__(self, _t)
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None):
IterableABC.__init__(self, _t, _values)
self._funcs: list[Callable] = []
if _func is not None:
self._funcs.append(_func)
@ -23,7 +24,7 @@ class OrderedIterableABC(IterableABC):
Returns
-------
list of :class:`cpl_query.extension.OrderedIterableABC`
list of :class:`cpl_query.iterable.ordered_iterable_abc.OrderedIterableABC`
"""
pass
@ -37,6 +38,6 @@ class OrderedIterableABC(IterableABC):
Returns
-------
list of :class:`cpl_query.extension.OrderedIterableABC`
list of :class:`cpl_query.iterable.ordered_iterable_abc.OrderedIterableABC`
"""
pass

View File

@ -1,405 +0,0 @@
from typing import Callable, Union, Optional
from cpl_query._helper import is_number
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument, InvalidTypeException, IndexOutOfRangeException
from cpl_query.iterable.iterable_abc import IterableABC
from cpl_query.iterable.ordered_iterable_abc import OrderedIterableABC
class Query:
@staticmethod
def all(_list: IterableABC, _func: Callable) -> bool:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
result = Query.where(_list, _func)
return len(result) == len(_list)
@staticmethod
def any(_list: IterableABC, _func: Callable) -> bool:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
result = Query.where(_list, _func)
return len(result) > 0
@staticmethod
def avg(_list: IterableABC, _func: Callable) -> Union[int, float, complex]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(_list.type):
raise InvalidTypeException()
average = 0
count = len(_list)
for element in _list:
if _func is not None:
value = _func(element)
else:
value = element
average += value
return average / count
@staticmethod
def contains(_list: IterableABC, _value: object) -> bool:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _value is None:
raise ArgumentNoneException(ExceptionArgument.value)
return _value in _list
@staticmethod
def count(_list: IterableABC, _func: Callable = None) -> int:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
return len(_list)
return len(Query.where(_list, _func))
@staticmethod
def distinct(_list: IterableABC, _func: Callable) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
result = IterableABC()
known_values = []
for element in _list:
value = _func(element)
if value in known_values:
continue
known_values.append(value)
result.append(element)
return result
@staticmethod
def element_at(_list: IterableABC, _index: int) -> any:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
return _list[_index]
@staticmethod
def element_at_or_default(_list: IterableABC, _index: int) -> any:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
try:
return _list[_index]
except IndexError:
return None
@staticmethod
def first(_list: IterableABC) -> any:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(_list) == 0:
raise IndexOutOfRangeException()
return _list[0]
@staticmethod
def first_or_default(_list: IterableABC) -> Optional[any]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(_list) == 0:
return None
return _list[0]
@staticmethod
def last(_list: IterableABC) -> any:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(_list) == 0:
raise IndexOutOfRangeException()
return _list[len(_list) - 1]
@staticmethod
def last_or_default(_list: IterableABC) -> Optional[any]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(_list) == 0:
return None
return _list[len(_list) - 1]
@staticmethod
def for_each(_list: IterableABC, _func: Callable):
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
for element in _list:
_func(element)
@staticmethod
def max(_list: IterableABC, _func: Callable) -> Union[int, float, complex]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(_list.type):
raise InvalidTypeException()
max_value = 0
for element in _list:
if _func is not None:
value = _func(element)
else:
value = element
if value > max_value:
max_value = value
return max_value
@staticmethod
def min(_list: IterableABC, _func: Callable) -> Union[int, float, complex]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(_list.type):
raise InvalidTypeException()
min_value = 0
is_first = True
for element in _list:
if _func is not None:
value = _func(element)
else:
value = element
if is_first:
min_value = value
is_first = False
if value < min_value:
min_value = value
return min_value
@staticmethod
def order_by(_list: IterableABC, _func: Callable) -> OrderedIterableABC:
result = OrderedIterableABC(_list.type, _func)
_list.sort(key=_func)
result.extend(_list)
return result
@staticmethod
def order_by_descending(_list: IterableABC, _func: Callable) -> OrderedIterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
result = OrderedIterableABC(_list.type, _func)
_list.sort(key=_func, reverse=True)
result.extend(_list)
return result
@staticmethod
def then_by(_list: OrderedIterableABC, _func: Callable) -> OrderedIterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
_list.sort(key=_func)
return _list
@staticmethod
def then_by_descending(_list: OrderedIterableABC, _func: Callable) -> OrderedIterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
_list.sort(key=_func, reverse=True)
return _list
@staticmethod
def reverse(_list: IterableABC) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
result = IterableABC()
_copied_list = _list.to_list()
_copied_list.reverse()
for element in _copied_list:
result.append(element)
return result
@staticmethod
def select(_list: IterableABC, _f: Callable) -> any:
result = IterableABC()
result.extend(_f(_o) for _o in _list)
return result
@staticmethod
def select_many(_list: IterableABC, _f: Callable) -> any:
result = IterableABC()
# The line below is pain. I don't understand anything of it...
# written on 09.11.2022 by Sven Heidemann
elements = [_a for _o in _list for _a in _f(_o)]
result.extend(elements)
return result
@staticmethod
def single(_list: IterableABC) -> any:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(_list) > 1:
raise Exception('Found more than one element')
elif len(_list) == 0:
raise Exception('Found no element')
return _list[0]
@staticmethod
def single_or_default(_list: IterableABC) -> Optional[any]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(_list) > 1:
raise Exception('Index out of range')
elif len(_list) == 0:
return None
return _list[0]
@staticmethod
def skip(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
if _index >= len(_list):
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[_index:])
return result
@staticmethod
def skip_last(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(_list) - _index
if index >= len(_list) or index < 0:
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[:index])
return result
@staticmethod
def take(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
if _index >= len(_list):
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[:_index])
return result
@staticmethod
def take_last(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(_list) - _index
if index >= len(_list) or index < 0:
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[index:])
return result
@staticmethod
def sum(_list: IterableABC, _func: Callable) -> Union[int, float, complex]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(_list.type):
raise InvalidTypeException()
result = 0
for element in _list:
if _func is not None:
value = _func(element)
else:
value = element
result += value
return result
@staticmethod
def where(_list: IterableABC, _func: Callable) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
result = IterableABC(_list.type)
for element in _list:
if _func(element):
result.append(element)
return result

View File

@ -0,0 +1,357 @@
import string
import unittest
from random import randint
from cpl_core.utils import String
from cpl_query.enumerable.enumerable import Enumerable
from cpl_query.exceptions import InvalidTypeException, ArgumentNoneException, IndexOutOfRangeException
from unittests_query.models import User, Address
class EnumerableQueryTestCase(unittest.TestCase):
def setUp(self) -> None:
self._tests = Enumerable(User)
self._t_user = User(
'Test user',
Address(
'teststr.',
15
)
)
self._t_user2 = User(
'Test user',
Address(
'teststr.',
14
)
)
self._generate_test_data()
def _generate_test_data(self):
for i in range(0, 100):
user = User(
String.random_string(string.ascii_letters, 8).lower(),
Address(
String.random_string(string.ascii_letters, 10).lower(),
randint(1, 10)
)
)
self._tests.add(user)
self._tests.add(self._t_user)
self._tests.add(self._t_user2)
def test_any(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.any(lambda u: u.address.nr == 10)
n_res = self._tests.any(lambda u: u.address.nr == 100)
self.assertTrue(res)
self.assertFalse(n_res)
def test_all(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.all(lambda u: u.address is not None)
n_res = self._tests.all(lambda u: u.address.nr == 100)
self.assertTrue(res)
self.assertFalse(n_res)
def test_avg(self):
avg = 0
for user in self._tests:
avg += user.address.nr
avg = avg / len(self._tests)
res = self._tests.average(lambda u: u.address.nr)
self.assertEqual(avg, res)
def invalid():
tests = Enumerable(str, ['hello', 'world'])
e_res = tests.average()
self.assertRaises(InvalidTypeException, invalid)
tests = Enumerable(int, list(range(0, 100)))
self.assertEqual(sum(tests) / len(tests), tests.average())
def wrong2():
tests2 = Enumerable(int, values=list(range(0, 100)))
e_res = tests2.average(lambda u: u.address.nr)
self.assertRaises(AttributeError, wrong2)
def test_contains(self):
self.assertTrue(self._tests.contains(self._t_user))
self.assertFalse(self._tests.contains(User("Test", None)))
def test_count(self):
self.assertEqual(len(self._tests), self._tests.count())
self.assertEqual(1, self._tests.count(lambda u: u == self._t_user))
def test_distinct(self):
res = self._tests.distinct(lambda u: u.address.nr).where(lambda u: u.address.nr == 5)
self.assertEqual(1, len(res))
def test_element_at(self):
index = randint(0, len(self._tests) - 1)
self.assertEqual(self._tests.element_at(index), self._tests.element_at(index))
def invalid():
result = self._tests.element_at(len(self._tests))
self.assertRaises(IndexOutOfRangeException, invalid)
def test_element_at_or_default(self):
index = randint(0, len(self._tests) - 1)
self.assertEqual(self._tests.element_at(index), self._tests.element_at_or_default(index))
self.assertIsNone(self._tests.element_at_or_default(len(self._tests)))
def test_last(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).last()
self.assertEqual(len(res), len(results))
self.assertEqual(res.element_at(len(res) - 1), s_res)
def test_last_or_default(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).last_or_default()
sn_res = self._tests.where(lambda u: u.address.nr == 11).last_or_default()
self.assertEqual(len(res), len(results))
self.assertEqual(res.element_at(len(res) - 1), s_res)
self.assertIsNone(sn_res)
def test_first(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).first()
self.assertEqual(len(res), len(results))
self.assertEqual(res.element_at(0), s_res)
def test_first_or_default(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).first_or_default()
sn_res = self._tests.where(lambda u: u.address.nr == 11).first_or_default()
self.assertEqual(len(res), len(results))
self.assertEqual(res.element_at(0), s_res)
self.assertIsNone(sn_res)
def test_for_each(self):
users = []
self._tests.for_each(lambda user: (
users.append(user)
)
)
self.assertEqual(len(users), len(self._tests))
def test_max(self):
res = self._tests.max(lambda u: u.address.nr)
self.assertEqual(res, self._t_user.address.nr)
tests = Enumerable(values=list(range(0, 100)))
self.assertEqual(99, tests.max())
def invalid():
tests = Enumerable(str, ['hello', 'world'])
e_res = tests.average()
self.assertRaises(InvalidTypeException, invalid)
def test_min(self):
res = self._tests.min(lambda u: u.address.nr)
self.assertEqual(1, res)
tests = Enumerable(values=list(range(0, 100)))
self.assertEqual(0, tests.min())
def invalid():
tests = Enumerable(str, ['hello', 'world'])
e_res = tests.average()
self.assertRaises(InvalidTypeException, invalid)
def test_order_by(self):
res = self._tests.order_by(lambda user: user.address.street)
res2 = self._tests.order_by(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.street)
self.assertEqual(res.to_list(), s_res)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.nr)
self.assertEqual(res2, s_res)
self.assertEqual(self._t_user, res.where(lambda u: u.address.nr == self._t_user.address.nr).single())
def test_order_by_descending(self):
res = self._tests.order_by_descending(lambda user: user.address.street).to_list()
res2 = self._tests.order_by_descending(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.street, reverse=True)
self.assertEqual(res, s_res)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.nr, reverse=True)
self.assertEqual(res2, s_res)
def test_then_by(self):
res = self._tests.order_by(lambda user: user.address.street).then_by(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: (user.address.street, user.address.nr))
self.assertEqual(res, s_res)
def test_then_by_descending(self):
res = self._tests.order_by_descending(lambda user: user.address.street).then_by_descending(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: (user.address.street, user.address.nr), reverse=True)
self.assertEqual(res, s_res)
def test_reverse(self):
res = self._tests.reverse().to_list()
l_res = self._tests.to_list()
l_res.reverse()
self.assertEqual(res, l_res)
def test_select(self):
range_list = Enumerable(int, range(0, 100))
selected_range = range_list.select(lambda x: x + 1)
modulo_range = []
for x in range(0, 100):
if x % 2 == 0:
modulo_range.append(x)
self.assertEqual(selected_range.to_list(), list(range(1, 101)))
self.assertEqual(range_list.where(lambda x: x % 2 == 0).to_list(), modulo_range)
def test_select_many(self):
range_list = Enumerable(int, list(range(0, 100)))
selected_range = range_list.select(lambda x: [x, x])
self.assertEqual(selected_range.to_list(), [[x, x] for x in range(0, 100)])
self.assertEqual(selected_range.select_many(lambda x: x).to_list(), [_x for _l in [2 * [x] for x in range(0, 100)] for _x in _l])
class TestClass:
def __init__(self, i, is_sub=False):
self.i = i
if is_sub:
return
self.elements = [TestClass(x, True) for x in range(0, 10)]
elements = Enumerable(TestClass, [TestClass(i) for i in range(0, 100)])
selected_elements = elements.select_many(lambda x: x.elements).select(lambda x: x.i)
self.assertEqual(selected_elements.where(lambda x: x == 0).count(), 100)
def test_single(self):
res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr)
s_res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr).single()
self.assertEqual(len(res), 1)
self.assertEqual(self._t_user, s_res)
def test_single_or_default(self):
res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr)
s_res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr).single_or_default()
sn_res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr + 1).single_or_default()
self.assertEqual(len(res), 1)
self.assertEqual(self._t_user, s_res)
self.assertIsNone(sn_res)
def test_skip(self):
skipped = self._tests.skip(5).to_list()
self.assertEqual(len(skipped), len(self._tests) - 5)
self.assertEqual(skipped, self._tests.to_list()[5:])
def test_skip_last(self):
skipped = self._tests.skip_last(5)
self.assertEqual(skipped.count(), len(self._tests) - 5)
self.assertEqual(skipped.to_list(), self._tests.to_list()[:-5])
self.assertEqual(skipped.last(), self._tests.to_list()[:-5][len(self._tests.to_list()[:-5]) - 1])
def test_sum(self):
res = self._tests.sum(lambda u: u.address.nr)
s_res = 0
for user in self._tests:
s_res += user.address.nr
self.assertEqual(s_res, res)
tests = Enumerable(values=list(range(0, 100)))
self.assertEqual(0, tests.min())
def invalid():
tests2 = Enumerable(str, ['hello', 'world'])
e_res = tests2.average()
self.assertRaises(InvalidTypeException, invalid)
def test_take(self):
skipped = self._tests.take(5)
self.assertEqual(skipped.count(), 5)
self.assertEqual(skipped.to_list(), self._tests.to_list()[:5])
def test_take_last(self):
skipped = self._tests.take_last(5)
self.assertEqual(skipped.count(), 5)
self.assertEqual(skipped.to_list(), self._tests.to_list()[-5:])
self.assertEqual(skipped.last(), self._tests.to_list()[len(self._tests) - 1])
def test_where(self):
results = []
for user in self._tests:
if user.address.nr == 5:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 5)
self.assertEqual(len(results), len(res))
def ex():
e_res = self._tests.where(None)
self.assertRaises(ArgumentNoneException, ex)

View File

@ -1,13 +1,12 @@
import unittest
from cpl_query.extension.lazy_list import LazyList
from cpl_query.extension.list import List
from cpl_query.enumerable.enumerable import Enumerable
class EnumerableTestCase(unittest.TestCase):
def setUp(self) -> None:
self._list = LazyList(int)
self._list = Enumerable(int)
def _clear(self):
self._list.clear()
@ -22,19 +21,19 @@ class EnumerableTestCase(unittest.TestCase):
self.assertRaises(Exception, lambda v: self._list.add(v), '3')
def test_default(self):
self.assertEqual(LazyList.empty().to_list(), [])
self.assertEqual(LazyList.range(0, 100).to_list(), list(range(0, 100)))
self.assertEqual(Enumerable.empty().to_list(), [])
self.assertEqual(Enumerable.range(0, 100).to_list(), list(range(0, 100)))
def test_iter(self):
n = 0
elements = LazyList.range(0, 100)
elements = Enumerable.range(0, 100)
while n < 100:
self.assertEqual(elements.next(), n)
n += 1
def test_for(self):
n = 0
for i in LazyList.range(0, 100):
for i in Enumerable.range(0, 100):
self.assertEqual(i, n)
n += 1
@ -43,7 +42,7 @@ class EnumerableTestCase(unittest.TestCase):
self._list.add(2)
self._list.add(3)
self.assertEqual(self._list[2], [1, 2, 3][2])
self.assertEqual(self._list.element_at(2), [1, 2, 3][2])
def test_count(self):
self._list.add(1)

View File

@ -8,7 +8,7 @@ from cpl_query.extension.list import List
from unittests_query.models import User, Address
class QueryTestCase(unittest.TestCase):
class IterableQueryTestCase(unittest.TestCase):
def setUp(self) -> None:
self._tests = List(User)
@ -19,6 +19,13 @@ class QueryTestCase(unittest.TestCase):
15
)
)
self._t_user2 = User(
'Test user',
Address(
'teststr.',
14
)
)
self._generate_test_data()
@ -35,6 +42,7 @@ class QueryTestCase(unittest.TestCase):
self._tests.append(user)
self._tests.append(self._t_user)
self._tests.append(self._t_user2)
def test_any(self):
results = []
@ -169,7 +177,7 @@ class QueryTestCase(unittest.TestCase):
def test_max(self):
res = self._tests.max(lambda u: u.address.nr)
self.assertEqual(self._t_user.address.nr, res)
self.assertEqual(res, self._t_user.address.nr)
tests = List(values=list(range(0, 100)))
self.assertEqual(99, tests.max())
@ -195,40 +203,41 @@ class QueryTestCase(unittest.TestCase):
def test_order_by(self):
res = self._tests.order_by(lambda user: user.address.street)
res2 = self._tests.order_by(lambda user: user.address.nr)
s_res = self._tests
res2 = self._tests.order_by(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.street)
self.assertEqual(res.to_list(), s_res)
self.assertEqual(res, s_res)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.nr)
self.assertEqual(res2, s_res)
self.assertEqual(self._t_user, res.where(lambda u: u.address.nr == self._t_user.address.nr).single())
def test_order_by_descending(self):
res = self._tests.order_by_descending(lambda user: user.address.street)
res2 = self._tests.order_by_descending(lambda user: user.address.nr)
s_res = self._tests
res = self._tests.order_by_descending(lambda user: user.address.street).to_list()
res2 = self._tests.order_by_descending(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.street, reverse=True)
self.assertEqual(res, s_res)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.nr, reverse=True)
self.assertEqual(res2, s_res)
def test_then_by(self):
res = self._tests.order_by(lambda user: user.address.street[0]).then_by(lambda user: user.address.nr)
res = self._tests.order_by(lambda user: user.address.street).then_by(lambda user: user.address.nr).to_list()
s_res = self._tests
s_res.sort(key=lambda user: (user.address.street[0], user.address.nr))
s_res = self._tests.to_list()
s_res.sort(key=lambda user: (user.address.street, user.address.nr))
self.assertEqual(res, s_res)
def test_then_by_descending(self):
res = self._tests.order_by_descending(lambda user: user.address.street[0]).then_by_descending(
lambda user: user.address.nr)
res = self._tests.order_by_descending(lambda user: user.address.street).then_by_descending(lambda user: user.address.nr).to_list()
s_res = self._tests
s_res.sort(key=lambda user: (user.address.street[0], user.address.nr), reverse=True)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: (user.address.street, user.address.nr), reverse=True)
self.assertEqual(res, s_res)
@ -237,7 +246,7 @@ class QueryTestCase(unittest.TestCase):
l_res = self._tests.to_list()
l_res.reverse()
self.assertEqual(l_res, res)
self.assertEqual(res.to_list(), l_res)
def test_select(self):
range_list = List(int, range(0, 100))
@ -251,10 +260,10 @@ class QueryTestCase(unittest.TestCase):
self.assertEqual(range_list.where(lambda x: x % 2 == 0).to_list(), modulo_range)
def test_select_many(self):
range_list = List(int, range(0, 100))
range_list = List(int, list(range(0, 100)))
selected_range = range_list.select(lambda x: [x, x])
self.assertEqual(selected_range, [[x, x] for x in range(0, 100)])
self.assertEqual(selected_range.to_list(), [[x, x] for x in range(0, 100)])
self.assertEqual(selected_range.select_many(lambda x: x).to_list(), [_x for _l in [2 * [x] for x in range(0, 100)] for _x in _l])
class TestClass:
@ -285,17 +294,17 @@ class QueryTestCase(unittest.TestCase):
self.assertIsNone(sn_res)
def test_skip(self):
skipped = self._tests.skip(5)
skipped = self._tests.skip(5).to_list()
self.assertEqual(len(self._tests) - 5, len(skipped))
self.assertEqual(self._tests[5:], skipped)
self.assertEqual(len(skipped), len(self._tests) - 5)
self.assertEqual(skipped, self._tests[5:])
def test_skip_last(self):
skipped = self._tests.skip_last(5)
self.assertEqual(len(self._tests) - 5, len(skipped))
self.assertEqual(self._tests[:-5], skipped)
self.assertEqual(self._tests[:-5][len(self._tests[:-5]) - 1], skipped.last())
self.assertEqual(skipped.count(), len(self._tests) - 5)
self.assertEqual(skipped.to_list(), self._tests[:-5])
self.assertEqual(skipped.last(), self._tests[:-5][len(self._tests[:-5]) - 1])
def test_sum(self):
res = self._tests.sum(lambda u: u.address.nr)
@ -318,15 +327,15 @@ class QueryTestCase(unittest.TestCase):
def test_take(self):
skipped = self._tests.take(5)
self.assertEqual(5, len(skipped))
self.assertEqual(self._tests[:5], skipped)
self.assertEqual(skipped.count(), 5)
self.assertEqual(skipped.to_list(), self._tests[:5])
def test_take_last(self):
skipped = self._tests.take_last(5)
self.assertEqual(5, len(skipped))
self.assertEqual(self._tests[-5:], skipped)
self.assertEqual(self._tests[len(self._tests) - 1], skipped.last())
self.assertEqual(skipped.count(), 5)
self.assertEqual(skipped.to_list(), self._tests[-5:])
self.assertEqual(skipped.last(), self._tests[len(self._tests) - 1])
def test_where(self):
results = []

View File

@ -17,5 +17,5 @@ class IterableTestCase(unittest.TestCase):
self._list.append(2)
self._list.append(3)
self.assertEqual(self._list, [1, 2, 3])
self.assertEqual(self._list.to_list(), [1, 2, 3])
self.assertRaises(Exception, lambda v: self._list.append(v), '3')

View File

@ -1,8 +1,9 @@
import unittest
from unittests_query.enumerable_query_test_case import EnumerableQueryTestCase
from unittests_query.enumerable_test_case import EnumerableTestCase
from unittests_query.iterable_query_test_case import IterableQueryTestCase
from unittests_query.iterable_test_case import IterableTestCase
from unittests_query.query_test_case import QueryTestCase
class QueryTestSuite(unittest.TestSuite):
@ -11,9 +12,10 @@ class QueryTestSuite(unittest.TestSuite):
unittest.TestSuite.__init__(self)
loader = unittest.TestLoader()
self.addTests(loader.loadTestsFromTestCase(QueryTestCase))
self.addTests(loader.loadTestsFromTestCase(EnumerableTestCase))
self.addTests(loader.loadTestsFromTestCase(EnumerableQueryTestCase))
self.addTests(loader.loadTestsFromTestCase(IterableTestCase))
self.addTests(loader.loadTestsFromTestCase(IterableQueryTestCase))
def run(self, *args):
super().run(*args)