Added different queries & unittests

This commit is contained in:
Sven Heidemann 2021-07-25 19:15:02 +02:00
parent 5c80cf2222
commit 4e78b9c12b
15 changed files with 283 additions and 10 deletions

View File

View File

@ -0,0 +1,7 @@
from cpl_query._query.where_query import where_query
from cpl_query.extension.iterable_abc import IterableABC
def any_query(_list: IterableABC, _func: str) -> bool:
result = where_query(_list, _func)
return len(result) > 0

View File

@ -0,0 +1,17 @@
from typing import Optional
from cpl_query.extension.iterable_abc import IterableABC
def first_query(_list: IterableABC) -> any:
if len(_list) == 0:
raise Exception('Index out of range')
return _list[0]
def first_or_default_query(_list: IterableABC) -> Optional[any]:
if len(_list) == 0:
return None
return _list[0]

View File

@ -0,0 +1,8 @@
from collections import Callable
from cpl_query.extension.iterable_abc import IterableABC
def for_each_query(_list: IterableABC, func: Callable):
for element in _list:
func(element)

View File

@ -0,0 +1,21 @@
from typing import Optional
from cpl_query.extension.iterable_abc import IterableABC
def single_query(_list: IterableABC) -> any:
if len(_list) > 1:
raise Exception('Found more than one element')
elif len(_list) == 0:
raise Exception('Found no element')
return _list[0]
def single_or_default_query(_list: IterableABC) -> Optional[any]:
if len(_list) > 1:
raise Exception('Index out of range')
elif len(_list) == 0:
return None
return _list[0]

View File

@ -0,0 +1,13 @@
from cpl_query.extension.iterable_abc import IterableABC
def where_query(_list: IterableABC, _func: str) -> IterableABC:
result = IterableABC()
for element in _list:
element_type = type(element).__name__
if element_type in _func:
func = _func.replace(element_type, 'element')
if eval(func):
result.append(element)
return result

View File

@ -20,7 +20,7 @@
],
"PythonVersion": ">=3.9.2",
"PythonPath": {
"linux": "../../cpl-env/bin/python3.9"
"linux": "../cpl-env/bin/python3.9"
},
"Classifiers": []
},

View File

View File

@ -0,0 +1,30 @@
from abc import ABC, abstractmethod
from typing import Optional, Callable
class IterableABC(ABC, list):
@abstractmethod
def __init__(self):
list.__init__(self)
@abstractmethod
def any(self, func: str) -> bool: pass
@abstractmethod
def first(self) -> any: pass
@abstractmethod
def first_or_default(self) -> any: pass
@abstractmethod
def for_each(self, func: Callable): pass
@abstractmethod
def single(self): pass
@abstractmethod
def single_or_default(self) -> Optional[any]: pass
@abstractmethod
def where(self, func: str) -> 'IterableABC': pass

View File

@ -0,0 +1,40 @@
from typing import Optional, Callable
from .._query.any_query import any_query
from .._query.first_query import first_or_default_query, first_query
from .._query.for_each_query import for_each_query
from .._query.single_query import single_query, single_or_default_query
from .._query.where_query import where_query
from cpl_query.extension.iterable_abc import IterableABC
class List(IterableABC):
def __init__(self):
IterableABC.__init__(self)
def any(self, func: str) -> bool:
return any_query(self, func)
def first(self) -> any:
return first_query(self)
def first_or_default(self) -> Optional[any]:
return first_or_default_query(self)
def for_each(self, func: Callable):
for_each_query(self, func)
def single(self) -> any:
return single_query(self)
def single_or_default(self) -> Optional[any]:
return single_or_default_query(self)
def where(self, func: str) -> IterableABC:
res = where_query(self, func)
res.__class__ = List
return res
def to_list(self) -> list:
return list(self)

View File

@ -1,9 +0,0 @@
from cpl.console import Console
def main():
Console.write_line('Hello World')
if __name__ == '__main__':
main()

View File

View File

@ -0,0 +1,12 @@
class User:
def __init__(self, name, address):
self.name = name
self.address = address
class Address:
def __init__(self, street, nr):
self.street = street
self.nr = nr

View File

@ -0,0 +1,111 @@
import string
import unittest
from random import randint
from cpl.utils import String
from cpl_query.extension.list import List
from cpl_query.tests.models import User, Address
class QueryTest(unittest.TestCase):
def setUp(self) -> None:
self._tests = List()
self._t_user = User(
'Test user',
Address(
'teststr.',
15
)
)
self._generate_test_data()
def _generate_test_data(self):
for i in range(0, 100):
user = User(
String.random_string(string.ascii_letters, 8),
Address(
String.random_string(string.ascii_letters, 10),
randint(0, 10)
)
)
self._tests.append(user)
self._tests.append(self._t_user)
def test_any(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.any(f'User.address.nr == 10')
n_res = self._tests.any(f'User.address.nr == 100')
self.assertTrue(res)
self.assertFalse(n_res)
def test_first(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(f'User.address.nr == 10')
s_res = self._tests.where(f'User.address.nr == 10').first()
self.assertEqual(len(res), len(results))
self.assertIsNotNone(s_res)
def test_first_or_default(self):
results = []
for user in self._tests:
if user.address.nr == 10:
results.append(user)
res = self._tests.where(f'User.address.nr == 10')
s_res = self._tests.where(f'User.address.nr == 10').first_or_default()
sn_res = self._tests.where(f'User.address.nr == 11').first_or_default()
self.assertEqual(len(res), len(results))
self.assertIsNotNone(s_res)
self.assertIsNone(sn_res)
def test_for_each(self):
users = []
self._tests.for_each(
lambda user: (
# Console.write_line(f'User: {user.name} | '),
# Console.write(f'Address: {user.address.street}'),
users.append(user)
)
)
self.assertEqual(len(users), len(self._tests))
def test_single(self):
res = self._tests.where(f'User.address.nr == {self._t_user.address.nr}')
s_res = self._tests.where(f'User.address.nr == {self._t_user.address.nr}').single()
self.assertEqual(len(res), 1)
self.assertEqual(self._t_user, s_res)
def test_single_or_default(self):
res = self._tests.where(f'User.address.nr == {self._t_user.address.nr}')
s_res = self._tests.where(f'User.address.nr == {self._t_user.address.nr}').single_or_default()
sn_res = self._tests.where(f'User.address.nr == {self._t_user.address.nr + 1}').single_or_default()
self.assertEqual(len(res), 1)
self.assertEqual(self._t_user, s_res)
self.assertIsNone(sn_res)
def test_where(self):
results = []
for user in self._tests:
if user.address.nr == 5:
results.append(user)
res = self._tests.where('User.address.nr == 5')
self.assertEqual(len(results), len(res))

View File

@ -0,0 +1,23 @@
import unittest
from cpl_query.tests.query_test import QueryTest
class Tester:
def __init__(self):
self._suite = unittest.TestSuite()
def create(self):
loader = unittest.TestLoader()
self._suite.addTests(loader.loadTestsFromTestCase(QueryTest))
def start(self):
runner = unittest.TextTestRunner()
runner.run(self._suite)
if __name__ == '__main__':
tester = Tester()
tester.create()
tester.start()