Files
cpl/unittests/unittests_query/iterable_query_test_case.py
2025-09-24 19:09:11 +02:00

383 lines
13 KiB
Python

import string
import unittest
from random import randint
from cpl.core.utils import String
from cpl.query.exceptions import InvalidTypeException, ArgumentNoneException
from cpl.query.extension.list import List
from cpl.query.collection import Iterable
from unittests_query.models import User, Address
class IterableQueryTestCase(unittest.TestCase):
def setUp(self) -> None:
self._tests = List(User)
self._t_user = User("Test user", Address("teststr.", 15))
self._t_user2 = User("Test user", Address("teststr.", 14))
self._generate_test_data()
def _generate_test_data(self):
for i in range(0, 100):
user = User(
String.random_string(string.ascii_letters, 8).lower(),
Address(String.random_string(string.ascii_letters, 10).lower(), randint(1, 10)),
)
self._tests.append(user)
self._tests.append(self._t_user)
self._tests.append(self._t_user2)
def test_any(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.any(lambda u: u.address.nr == 10)
n_res = self._tests.any(lambda u: u.address.nr == 100)
self.assertTrue(res)
self.assertFalse(n_res)
def test_all(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.all(lambda u: u.address is not None)
n_res = self._tests.all(lambda u: u.address.nr == 100)
self.assertTrue(res)
self.assertFalse(n_res)
def test_avg(self):
avg = 0
for user in self._tests:
avg += user.address.nr
avg = avg / len(self._tests)
res = self._tests.average(lambda u: u.address.nr)
self.assertEqual(avg, res)
def invalid():
tests = List(str, ["hello", "world"])
e_res = tests.average()
self.assertRaises(InvalidTypeException, invalid)
tests = List(int, list(range(0, 100)))
self.assertEqual(sum(tests) / len(tests), tests.average())
def wrong2():
tests2 = List(int, values=list(range(0, 100)))
e_res = tests2.average(lambda u: u.address.nr)
self.assertRaises(AttributeError, wrong2)
def test_contains(self):
self.assertTrue(self._tests.contains(self._t_user))
self.assertFalse(self._tests.contains(User("Test", None)))
def test_count(self):
self.assertEqual(len(self._tests), self._tests.count())
self.assertEqual(1, self._tests.count(lambda u: u == self._t_user))
def test_distinct(self):
res = self._tests.select(lambda u: u.address.nr).where(lambda a: a == 5).distinct()
self.assertEqual(1, res.count())
addresses = []
for u in self._tests:
if u.address.nr in addresses:
continue
addresses.append(u.address.nr)
res2 = self._tests.distinct(lambda x: x.address.nr).select(lambda x: x.address.nr).to_list()
self.assertEqual(addresses, res2)
def test_element_at(self):
index = randint(0, len(self._tests) - 1)
self.assertEqual(self._tests[index], self._tests.element_at(index))
def test_element_at_or_default(self):
index = randint(0, len(self._tests) - 1)
self.assertEqual(self._tests[index], self._tests.element_at_or_default(index))
self.assertIsNone(self._tests.element_at_or_default(len(self._tests)))
def test_last(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).last()
self.assertEqual(len(res), len(results))
self.assertEqual(res[len(res) - 1], s_res)
def test_last_or_default(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).last_or_default()
sn_res = self._tests.where(lambda u: u.address.nr == 11).last_or_default()
self.assertEqual(len(res), len(results))
self.assertEqual(res[len(res) - 1], s_res)
self.assertIsNone(sn_res)
def test_first(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).first()
self.assertEqual(len(res), len(results))
self.assertEqual(res[0], s_res)
self.assertEqual(res[0], res.first())
self.assertEqual(res.first(), res.first())
def test_first_or_default(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 10)
s_res = self._tests.where(lambda u: u.address.nr == 10).first_or_default()
sn_res = self._tests.where(lambda u: u.address.nr == 11).first_or_default()
self.assertEqual(len(res), len(results))
self.assertEqual(res[0], s_res)
self.assertIsNone(sn_res)
def test_group_by(self):
def by_adr(u):
return u.address.nr
t = self._tests.select(by_adr).group_by()
res = self._tests.group_by(by_adr)
self.assertTrue(isinstance(res.first_or_default(), Iterable))
self.assertNotEqual(self._tests.count(), res.count())
self.assertEqual(self._tests.distinct(by_adr).count(), res.count())
elements = List(int)
groups = {}
for x in range(0, 1000):
v = randint(1, 100)
if v not in groups:
groups[v] = []
groups[v].append(v)
elements.append(v)
r1, r2 = list(groups.values()), elements.group_by().select(lambda l: l.to_list()).to_list()
self.assertEqual(r1, r2)
def test_for_each(self):
users = []
self._tests.for_each(lambda user: (users.append(user)))
self.assertEqual(len(users), len(self._tests))
def test_max(self):
res = self._tests.max(lambda u: u.address.nr)
self.assertEqual(res, self._t_user.address.nr)
tests = List(int, list(range(0, 100)))
self.assertEqual(99, tests.max())
def invalid():
tests = List(str, ["hello", "world"])
e_res = tests.average()
self.assertRaises(InvalidTypeException, invalid)
def test_min(self):
res = self._tests.min(lambda u: u.address.nr)
self.assertEqual(1, res)
tests = List(int, list(range(0, 100)))
self.assertEqual(0, tests.min())
def invalid():
tests = List(str, ["hello", "world"])
e_res = tests.average()
self.assertRaises(InvalidTypeException, invalid)
def test_order_by(self):
res = self._tests.order_by(lambda user: user.address.street)
res2 = self._tests.order_by(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.street)
self.assertEqual(res.to_list(), s_res)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.nr)
self.assertEqual(res2, s_res)
self.assertEqual(self._t_user, res.where(lambda u: u.address.nr == self._t_user.address.nr).single())
def test_order_by_descending(self):
res = self._tests.order_by_descending(lambda user: user.address.street).to_list()
res2 = self._tests.order_by_descending(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.street, reverse=True)
self.assertEqual(res, s_res)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: user.address.nr, reverse=True)
self.assertEqual(res2, s_res)
def test_then_by(self):
res = self._tests.order_by(lambda user: user.address.street).then_by(lambda user: user.address.nr).to_list()
s_res = self._tests.to_list()
s_res.sort(key=lambda user: (user.address.street, user.address.nr))
self.assertEqual(res, s_res)
def test_then_by_descending(self):
res = (
self._tests.order_by_descending(lambda user: user.address.street)
.then_by_descending(lambda user: user.address.nr)
.to_list()
)
s_res = self._tests.to_list()
s_res.sort(key=lambda user: (user.address.street, user.address.nr), reverse=True)
self.assertEqual(res, s_res)
def test_reverse(self):
res = self._tests.reverse()
l_res = self._tests.to_list()
l_res.reverse()
self.assertEqual(res.to_list(), l_res)
def test_select(self):
def test(_l: List) -> List[int]:
return _l.select(lambda user: user.address.nr)
self.assertEqual(List[User], self._tests.type)
self.assertEqual(List[int], test(self._tests).type)
range_list = List(int, range(0, 100))
selected_range = range_list.select(lambda x: x + 1)
modulo_range = []
for x in range(0, 100):
if x % 2 == 0:
modulo_range.append(x)
self.assertEqual(selected_range.to_list(), list(range(1, 101)))
self.assertEqual(range_list.where(lambda x: x % 2 == 0).to_list(), modulo_range)
def test_select_many(self):
range_list = List(int, list(range(0, 100)))
selected_range = range_list.select(lambda x: [x, x])
self.assertEqual(selected_range.to_list(), [[x, x] for x in range(0, 100)])
self.assertEqual(
selected_range.select_many(lambda x: x).to_list(),
[_x for _l in [2 * [x] for x in range(0, 100)] for _x in _l],
)
class TestClass:
def __init__(self, i, is_sub=False):
self.i = i
if is_sub:
return
self.elements = [TestClass(x, True) for x in range(0, 10)]
elements = List(TestClass, [TestClass(i) for i in range(0, 100)])
selected_elements = elements.select_many(lambda x: x.elements).select(lambda x: x.i)
self.assertEqual(selected_elements.where(lambda x: x == 0).count(), 100)
def test_single(self):
res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr)
s_res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr).single()
self.assertEqual(len(res), 1)
self.assertEqual(self._t_user, s_res)
def test_single_or_default(self):
res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr)
s_res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr).single_or_default()
sn_res = self._tests.where(lambda u: u.address.nr == self._t_user.address.nr + 1).single_or_default()
self.assertEqual(len(res), 1)
self.assertEqual(self._t_user, s_res)
self.assertIsNone(sn_res)
def test_skip(self):
skipped = self._tests.skip(5).to_list()
self.assertEqual(len(skipped), len(self._tests) - 5)
self.assertEqual(skipped, self._tests[5:])
def test_skip_last(self):
skipped = self._tests.skip_last(5)
self.assertEqual(skipped.count(), len(self._tests) - 5)
self.assertEqual(skipped.to_list(), self._tests[:-5])
self.assertEqual(skipped.last(), self._tests[:-5][len(self._tests[:-5]) - 1])
def test_sum(self) -> List["int"]:
res = self._tests.sum(lambda u: u.address.nr)
s_res = 0
for user in self._tests:
s_res += user.address.nr
self.assertEqual(s_res, res)
tests = List(int, list(range(0, 100)))
self.assertEqual(0, tests.min())
def invalid():
tests2 = List(str, ["hello", "world"])
e_res = tests2.average()
self.assertRaises(InvalidTypeException, invalid)
def test_take(self):
skipped = self._tests.take(5)
self.assertEqual(skipped.count(), 5)
self.assertEqual(skipped.to_list(), self._tests[:5])
def test_take_last(self):
skipped = self._tests.take_last(5)
self.assertEqual(skipped.count(), 5)
self.assertEqual(skipped.to_list(), self._tests[-5:])
self.assertEqual(skipped.last(), self._tests[len(self._tests) - 1])
def test_where(self):
results = []
for user in self._tests:
if user.address.nr == 5:
results.append(user)
res = self._tests.where(lambda u: u.address.nr == 5)
self.assertEqual(len(results), len(res))
def ex():
e_res = self._tests.where(None)
self.assertRaises(ArgumentNoneException, ex)