from typing import Optional, Callable, Union from cpl_query._helper import is_number from cpl_query.base.sequence import Sequence from cpl_query.exceptions import InvalidTypeException, ArgumentNoneException, ExceptionArgument, IndexOutOfRangeException def _default_lambda(x: object): return x class QueryableABC(Sequence): def __init__(self, t: type = None, values: list = None): Sequence.__init__(self, t, values) def all(self, _func: Callable = None) -> bool: r"""Checks if every element of list equals result found by function Parameter --------- func: :class:`Callable` selected value Returns ------- bool """ if _func is None: _func = _default_lambda return self.count(_func) == self.count() def any(self, _func: Callable = None) -> bool: r"""Checks if list contains result found by function Parameter --------- func: :class:`Callable` selected value Returns ------- bool """ if _func is None: _func = _default_lambda return self.where(_func).count() > 0 def average(self, _func: Callable = None) -> Union[int, float, complex]: r"""Returns average value of list Parameter --------- func: :class:`Callable` selected value Returns ------- Union[int, float, complex] """ if _func is None and not is_number(self.type): raise InvalidTypeException() return self.sum(_func) / self.count() def contains(self, _value: object) -> bool: r"""Checks if list contains value given by function Parameter --------- value: :class:`object` value Returns ------- bool """ if _value is None: raise ArgumentNoneException(ExceptionArgument.value) return self.where(lambda x: x == _value).count() > 0 def count(self, _func: Callable = None) -> int: r"""Returns length of list or count of found elements Parameter --------- func: :class:`Callable` selected value Returns ------- int """ if _func is None: return self.__len__() return self.where(_func).__len__() def distinct(self, _func: Callable = None) -> 'QueryableABC': r"""Returns list without redundancies Parameter --------- func: :class:`Callable` selected value Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _func is None: _func = _default_lambda result = [] known_values = [] for element in self: value = _func(element) if value in known_values: continue known_values.append(value) result.append(element) return type(self)(self._type, result) def element_at(self, _index: int) -> any: r"""Returns element at given index Parameter --------- _index: :class:`int` index Returns ------- Value at _index: any """ if _index is None: raise ArgumentNoneException(ExceptionArgument.index) if _index < 0 or _index >= self.count(): raise IndexOutOfRangeException result = self[_index] if result is None: raise IndexOutOfRangeException return result def element_at_or_default(self, _index: int) -> Optional[any]: r"""Returns element at given index or None Parameter --------- _index: :class:`int` index Returns ------- Value at _index: Optional[any] """ if _index is None: raise ArgumentNoneException(ExceptionArgument.index) try: return self[_index] except IndexError: return None def first(self) -> any: r"""Returns first element Returns ------- First element of list: any """ if len(self) == 0: raise IndexOutOfRangeException() return self[0] def first_or_default(self) -> any: r"""Returns first element or None Returns ------- First element of list: Optional[any] """ if len(self) == 0: return None return self[0] def for_each(self, _func: Callable = None): r"""Runs given function for each element of list Parameter --------- func: :class: `Callable` function to call """ if _func is not None: for element in self: _func(element) return self def group_by(self, _func: Callable = None) -> 'QueryableABC': r"""Groups by func Returns ------- Grouped list[list[any]]: any """ if _func is None: _func = _default_lambda groups = {} for v in self: value = _func(v) if value not in groups: groups[value] = [] groups[value].append(v) v = [] for g in groups.values(): v.append(type(self)(None, g)) x = type(self)(type(self), v) return x def last(self) -> any: r"""Returns last element Returns ------- Last element of list: any """ if len(self) == 0: raise IndexOutOfRangeException() return self[len(self) - 1] def last_or_default(self) -> any: r"""Returns last element or None Returns ------- Last element of list: Optional[any] """ if len(self) == 0: return None return self[len(self) - 1] def max(self, _func: Callable = None) -> Union[int, float, complex]: r"""Returns the highest value Parameter --------- func: :class:`Callable` selected value Returns ------- Union[int, float, complex] """ if _func is None and not is_number(self.type): raise InvalidTypeException() if _func is None: _func = _default_lambda return _func(max(self, key=_func)) def median(self, _func=None) -> Union[int, float]: r"""Return the median value of data elements Returns ------- Union[int, float] """ if _func is None: _func = _default_lambda result = self.order_by(_func).select(_func).to_list() length = len(result) i = int(length / 2) return ( result[i] if length % 2 == 1 else (float(result[i - 1]) + float(result[i])) / float(2) ) def min(self, _func: Callable = None) -> Union[int, float, complex]: r"""Returns the lowest value Parameter --------- func: :class:`Callable` selected value Returns ------- Union[int, float, complex] """ if _func is None and not is_number(self.type): raise InvalidTypeException() if _func is None: _func = _default_lambda return _func(min(self, key=_func)) def order_by(self, _func: Callable = None) -> 'QueryableABC': r"""Sorts elements by function in ascending order Parameter --------- func: :class:`Callable` selected value Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _func is None: _func = _default_lambda from cpl_query.base.ordered_queryable import OrderedQueryable return OrderedQueryable(self.type, sorted(self, key=_func), _func) def order_by_descending(self, _func: Callable = None) -> 'QueryableABC': r"""Sorts elements by function in descending order Parameter --------- func: :class:`Callable` selected value Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _func is None: _func = _default_lambda from cpl_query.base.ordered_queryable import OrderedQueryable return OrderedQueryable(self.type, sorted(self, key=_func, reverse=True), _func) def reverse(self) -> 'QueryableABC': r"""Reverses list Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ return type(self)(self._type, list(reversed(self))) def select(self, _func: Callable) -> 'QueryableABC': r"""Formats each element of list to a given format Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _func is None: _func = _default_lambda return type(self)(any, [_func(_o) for _o in self]) def select_many(self, _func: Callable) -> 'QueryableABC': r"""Flattens resulting lists to one Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ # The line below is pain. I don't understand anything of it... # written on 09.11.2022 by Sven Heidemann return type(self)(any, [_a for _o in self for _a in _func(_o)]) def single(self) -> any: r"""Returns one single element of list Returns ------- Found value: any Raises ------ ArgumentNoneException: when argument is None Exception: when argument is None or found more than one element """ if len(self) > 1: raise Exception('Found more than one element') elif len(self) == 0: raise Exception('Found no element') return self[0] def single_or_default(self) -> Optional[any]: r"""Returns one single element of list Returns ------- Found value: Optional[any] """ if len(self) > 1: raise Exception('Index out of range') elif len(self) == 0: return None return self[0] def skip(self, _index: int) -> 'QueryableABC': r"""Skips all elements from index Parameter --------- _index: :class:`int` index Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _index is None: raise ArgumentNoneException(ExceptionArgument.index) return type(self)(self.type, values=self[_index:]) def skip_last(self, _index: int) -> 'QueryableABC': r"""Skips all elements after index Parameter --------- _index: :class:`int` index Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _index is None: raise ArgumentNoneException(ExceptionArgument.index) index = len(self) - _index return type(self)(self._type, self[:index]) def sum(self, _func: Callable = None) -> Union[int, float, complex]: r"""Sum of all values Parameter --------- func: :class:`Callable` selected value Returns ------- Union[int, float, complex] """ if _func is None and not is_number(self.type): raise InvalidTypeException() if _func is None: _func = _default_lambda result = 0 for x in self: result += _func(x) return result def take(self, _index: int) -> 'QueryableABC': r"""Takes all elements from index Parameter --------- _index: :class:`int` index Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _index is None: raise ArgumentNoneException(ExceptionArgument.index) return type(self)(self._type, self[:_index]) def take_last(self, _index: int) -> 'QueryableABC': r"""Takes all elements after index Parameter --------- _index: :class:`int` index Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ index = len(self) - _index if index >= len(self) or index < 0: raise IndexOutOfRangeException() return type(self)(self._type, self[index:]) def where(self, _func: Callable = None) -> 'QueryableABC': r"""Select element by function Parameter --------- func: :class:`Callable` selected value Returns ------- :class: `cpl_query.base.queryable_abc.QueryableABC` """ if _func is None: raise ArgumentNoneException(ExceptionArgument.func) if _func is None: _func = _default_lambda result = [] for element in self: if _func(element): result.append(element) return type(self)(self.type, result)