Added queries sum, skip & take

This commit is contained in:
Sven Heidemann 2021-07-27 13:57:42 +02:00
parent 2b5831c5fe
commit 37175b7227
6 changed files with 189 additions and 13 deletions

View File

@ -1,6 +1,5 @@
from typing import Optional, Callable, Union
from cpl_query._extension.ordered_iterable import OrderedIterable
from cpl_query.extension.ordered_iterable_abc import OrderedIterableABC
from .._query.all import all_query
from .._query.any import any_query
@ -15,6 +14,8 @@ from .._query.max_min import max_query, min_query
from .._query.order_by import order_by_query, order_by_descending_query
from .._query.reverse import reverse_query
from .._query.single import single_query, single_or_default_query
from .._query.skip_take import skip_query, skip_last_query, take_query, take_last_query
from .._query.sum import sum_query
from .._query.where import where_query
from cpl_query.extension.iterable_abc import IterableABC
@ -40,9 +41,7 @@ class Iterable(IterableABC):
return count_query(self, func)
def distinct(self, func: Callable) -> IterableABC:
res = distinct_query(self, func)
res.__class__ = Iterable
return res
return self.__to_self(distinct_query(self, func))
def element_at(self, index: int) -> any:
return element_at_query(self, index)
@ -73,11 +72,13 @@ class Iterable(IterableABC):
def order_by(self, func: Callable) -> OrderedIterableABC:
res = order_by_query(self, func)
from cpl_query._extension.ordered_iterable import OrderedIterable
res.__class__ = OrderedIterable
return res
def order_by_descending(self, func: Callable) -> OrderedIterableABC:
res = order_by_descending_query(self, func)
from cpl_query._extension.ordered_iterable import OrderedIterable
res.__class__ = OrderedIterable
return res
@ -90,7 +91,25 @@ class Iterable(IterableABC):
def single_or_default(self) -> Optional[any]:
return single_or_default_query(self)
def skip(self, index: int) -> IterableABC:
return self.__to_self(skip_query(self, index))
def skip_last(self, index: int) -> IterableABC:
return self.__to_self(skip_last_query(self, index))
def sum(self, func: Callable = None) -> Union[int, float, complex]:
return sum_query(self, func)
def take(self, index: int) -> IterableABC:
return self.__to_self(take_query(self, index))
def take_last(self, index: int) -> IterableABC:
return self.__to_self(take_last_query(self, index))
def where(self, func: Callable) -> IterableABC:
res = where_query(self, func)
res.__class__ = Iterable
return res
return self.__to_self(where_query(self, func))
@staticmethod
def __to_self(obj: IterableABC) -> IterableABC:
obj.__class__ = Iterable
return obj

View File

@ -1,13 +1,14 @@
from abc import ABC
from collections import Callable
from .iterable import Iterable
from .._query.order_by import then_by_query, then_by_descending_query
from cpl_query.extension.ordered_iterable_abc import OrderedIterableABC
class OrderedIterable(OrderedIterableABC, ABC):
class OrderedIterable(Iterable, OrderedIterableABC):
def __init__(self):
Iterable.__init__(self)
OrderedIterableABC.__init__(self)
def then_by(self, _func: Callable) -> OrderedIterableABC:

View File

@ -0,0 +1,66 @@
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument, IndexOutOfRangeException
from cpl_query.extension.iterable_abc import IterableABC
def skip_query(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
if _index >= len(_list):
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[_index:])
return result
def skip_last_query(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(_list) - _index
if index >= len(_list) or index < 0:
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[:index])
return result
def take_query(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
if _index >= len(_list):
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[:_index])
return result
def take_last_query(_list: IterableABC, _index: int) -> IterableABC:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(_list) - _index
if index >= len(_list) or index < 0:
raise IndexOutOfRangeException()
result = IterableABC()
result.extend(_list[index:])
return result

View File

@ -0,0 +1,25 @@
from collections import Callable
from typing import Union
from cpl_query._helper import is_number
from cpl_query.exceptions import ExceptionArgument, ArgumentNoneException, InvalidTypeException
from cpl_query.extension.iterable_abc import IterableABC
def sum_query(_list: IterableABC, _func: Callable) -> Union[int, float, complex]:
if _list is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(_list.type):
raise InvalidTypeException()
result = 0
for element in _list:
if _func is not None:
value = _func(element)
else:
value = element
result += value
return result

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional, Callable, Union
from typing import Optional, Callable, Union, Iterable
class IterableABC(ABC, list):
@ -53,6 +53,10 @@ class IterableABC(ABC, list):
@abstractmethod
def element_at_or_default(self, index: int) -> Optional[any]: pass
def extend(self, __iterable: Iterable) -> None:
for value in __iterable:
self.append(value)
@abstractmethod
def last(self) -> any: pass
@ -86,11 +90,26 @@ class IterableABC(ABC, list):
@abstractmethod
def single(self) -> any: pass
def to_list(self) -> list:
return list(self)
@abstractmethod
def single_or_default(self) -> Optional[any]: pass
@abstractmethod
def skip(self, index: int) -> 'IterableABC': pass
@abstractmethod
def skip_last(self, index: int) -> 'IterableABC': pass
@abstractmethod
def sum(self, func: Callable = None) -> Union[int, float, complex]: pass
@abstractmethod
def take(self, index: int) -> 'IterableABC': pass
@abstractmethod
def take_last(self, index: int) -> 'IterableABC': pass
def to_list(self) -> list:
return list(self)
@abstractmethod
def where(self, func: Callable) -> 'IterableABC': pass

View File

@ -204,6 +204,8 @@ class QueryTest(unittest.TestCase):
s_res.sort(key=lambda user: user.address.nr)
self.assertEqual(res2, s_res)
self.assertEqual(self._t_user, res.where(lambda u: u.address.nr == self._t_user.address.nr).single())
def test_order_by_descending(self):
res = self._tests.order_by_descending(lambda user: user.address.street)
res2 = self._tests.order_by_descending(lambda user: user.address.nr)
@ -254,6 +256,50 @@ class QueryTest(unittest.TestCase):
self.assertEqual(self._t_user, s_res)
self.assertIsNone(sn_res)
def test_skip(self):
skipped = self._tests.skip(5)
self.assertEqual(len(self._tests) - 5, len(skipped))
self.assertEqual(self._tests[5:], skipped)
def test_skip_last(self):
skipped = self._tests.skip_last(5)
self.assertEqual(len(self._tests) - 5, len(skipped))
self.assertEqual(self._tests[:-5], skipped)
self.assertEqual(self._tests[:-5][len(self._tests[:-5]) - 1], skipped.last())
def test_sum(self):
res = self._tests.sum(lambda u: u.address.nr)
s_res = 0
for user in self._tests:
s_res += user.address.nr
self.assertEqual(s_res, res)
tests = List(values=list(range(0, 100)))
self.assertEqual(0, tests.min())
def invalid():
tests2 = List(str, ['hello', 'world'])
e_res = tests2.average()
self.assertRaises(InvalidTypeException, invalid)
def test_take(self):
skipped = self._tests.take(5)
self.assertEqual(5, len(skipped))
self.assertEqual(self._tests[:5], skipped)
def test_take_last(self):
skipped = self._tests.take_last(5)
self.assertEqual(5, len(skipped))
self.assertEqual(self._tests[-5:], skipped)
self.assertEqual(self._tests[len(self._tests) - 1], skipped.last())
def test_where(self):
results = []
for user in self._tests: