2022.12 #133

Merged
edraft merged 85 commits from 2022.12 into master 2022-12-25 11:58:35 +01:00
40 changed files with 578 additions and 1146 deletions
Showing only changes of commit 4dc7ee3314 - Show all commits

View File

@ -0,0 +1,2 @@
def default_lambda(x: object):
return x

View File

@ -0,0 +1,34 @@
from collections.abc import Callable
from cpl_query.base.ordered_queryable_abc import OrderedQueryableABC
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument
from cpl_query.iterable.iterable import Iterable
class OrderedQueryable(OrderedQueryableABC):
r"""Implementation of :class: `cpl_query.extension.Iterable` `cpl_query.extension.OrderedIterableABC`
"""
def __init__(self, _t: type, _values: Iterable = None, _func: Callable = None):
OrderedQueryableABC.__init__(self, _t, _values, _func)
def then_by(self: OrderedQueryableABC, _func: Callable) -> OrderedQueryableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedQueryable(self.type, sorted(self, key=lambda *args: [f(*args) for f in self._funcs]), _func)
def then_by_descending(self: OrderedQueryableABC, _func: Callable) -> OrderedQueryableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedQueryable(self.type, sorted(self, key=lambda *args: [f(*args) for f in self._funcs], reverse=True), _func)

View File

@ -2,20 +2,20 @@ from abc import abstractmethod
from collections.abc import Callable from collections.abc import Callable
from typing import Iterable from typing import Iterable
from cpl_query.iterable.iterable_abc import IterableABC from cpl_query.base.queryable_abc import QueryableABC
class OrderedIterableABC(IterableABC): class OrderedQueryableABC(QueryableABC):
@abstractmethod @abstractmethod
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None): def __init__(self, _t: type, _values: Iterable = None, _func: Callable = None):
IterableABC.__init__(self, _t, _values) QueryableABC.__init__(self, _t, _values)
self._funcs: list[Callable] = [] self._funcs: list[Callable] = []
if _func is not None: if _func is not None:
self._funcs.append(_func) self._funcs.append(_func)
@abstractmethod @abstractmethod
def then_by(self, func: Callable) -> 'OrderedIterableABC': def then_by(self, func: Callable) -> 'OrderedQueryableABC':
r"""Sorts OrderedList in ascending order by function r"""Sorts OrderedList in ascending order by function
Parameter Parameter
@ -29,7 +29,7 @@ class OrderedIterableABC(IterableABC):
pass pass
@abstractmethod @abstractmethod
def then_by_descending(self, func: Callable) -> 'OrderedIterableABC': def then_by_descending(self, func: Callable) -> 'OrderedQueryableABC':
r"""Sorts OrderedList in descending order by function r"""Sorts OrderedList in descending order by function
Parameter Parameter

View File

@ -1,10 +1,19 @@
from abc import abstractmethod, ABC
from typing import Optional, Callable, Union from typing import Optional, Callable, Union
from cpl_query._helper import is_number
from cpl_query.base.sequence_abc import SequenceABC
from cpl_query.exceptions import InvalidTypeException, ArgumentNoneException, ExceptionArgument, IndexOutOfRangeException
class QueryableABC(ABC):
@abstractmethod def _default_lambda(x: object):
return x
class QueryableABC(SequenceABC):
def __init__(self, t: type = None, values: list = None):
SequenceABC.__init__(self, t, values)
def all(self, _func: Callable = None) -> bool: def all(self, _func: Callable = None) -> bool:
r"""Checks if every element of list equals result found by function r"""Checks if every element of list equals result found by function
@ -17,9 +26,11 @@ class QueryableABC(ABC):
------- -------
bool bool
""" """
pass if _func is None:
_func = _default_lambda
return self.count(_func) == self.count()
@abstractmethod
def any(self, _func: Callable = None) -> bool: def any(self, _func: Callable = None) -> bool:
r"""Checks if list contains result found by function r"""Checks if list contains result found by function
@ -32,9 +43,11 @@ class QueryableABC(ABC):
------- -------
bool bool
""" """
pass if _func is None:
_func = _default_lambda
return self.where(_func).count() > 0
@abstractmethod
def average(self, _func: Callable = None) -> Union[int, float, complex]: def average(self, _func: Callable = None) -> Union[int, float, complex]:
r"""Returns average value of list r"""Returns average value of list
@ -47,10 +60,12 @@ class QueryableABC(ABC):
------- -------
Union[int, float, complex] Union[int, float, complex]
""" """
pass if _func is None and not is_number(self.type):
raise InvalidTypeException()
@abstractmethod return self.sum(_func) / self.count()
def contains(self, value: object) -> bool:
def contains(self, _value: object) -> bool:
r"""Checks if list contains value given by function r"""Checks if list contains value given by function
Parameter Parameter
@ -62,9 +77,11 @@ class QueryableABC(ABC):
------- -------
bool bool
""" """
pass if _value is None:
raise ArgumentNoneException(ExceptionArgument.value)
return self.where(lambda x: x == _value).count() > 0
@abstractmethod
def count(self, _func: Callable = None) -> int: def count(self, _func: Callable = None) -> int:
r"""Returns length of list or count of found elements r"""Returns length of list or count of found elements
@ -77,9 +94,11 @@ class QueryableABC(ABC):
------- -------
int int
""" """
pass if _func is None:
return self.__len__()
return self.where(_func).__len__()
@abstractmethod
def distinct(self, _func: Callable = None) -> 'QueryableABC': def distinct(self, _func: Callable = None) -> 'QueryableABC':
r"""Returns list without redundancies r"""Returns list without redundancies
@ -92,39 +111,62 @@ class QueryableABC(ABC):
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _func is None:
_func = _default_lambda
@abstractmethod result = []
def element_at(self, index: int) -> any: known_values = []
for element in self:
value = _func(element)
if value in known_values:
continue
known_values.append(value)
result.append(element)
return type(self)(self._type, result)
def element_at(self, _index: int) -> any:
r"""Returns element at given index r"""Returns element at given index
Parameter Parameter
--------- ---------
index: :class:`int` _index: :class:`int`
index index
Returns Returns
------- -------
Value at index: any Value at _index: any
""" """
pass if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
@abstractmethod result = self[_index]
def element_at_or_default(self, index: int) -> Optional[any]: if result is None:
raise IndexOutOfRangeException
return result
def element_at_or_default(self, _index: int) -> Optional[any]:
r"""Returns element at given index or None r"""Returns element at given index or None
Parameter Parameter
--------- ---------
index: :class:`int` _index: :class:`int`
index index
Returns Returns
------- -------
Value at index: Optional[any] Value at _index: Optional[any]
""" """
pass if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
try:
return self[_index]
except IndexError:
return None
@abstractmethod
def first(self) -> any: def first(self) -> any:
r"""Returns first element r"""Returns first element
@ -132,9 +174,11 @@ class QueryableABC(ABC):
------- -------
First element of list: any First element of list: any
""" """
pass if len(self) == 0:
raise IndexOutOfRangeException()
return self[0]
@abstractmethod
def first_or_default(self) -> any: def first_or_default(self) -> any:
r"""Returns first element or None r"""Returns first element or None
@ -142,9 +186,11 @@ class QueryableABC(ABC):
------- -------
First element of list: Optional[any] First element of list: Optional[any]
""" """
pass if len(self) == 0:
return None
return self[0]
@abstractmethod
def for_each(self, _func: Callable = None): def for_each(self, _func: Callable = None):
r"""Runs given function for each element of list r"""Runs given function for each element of list
@ -153,9 +199,36 @@ class QueryableABC(ABC):
func: :class: `Callable` func: :class: `Callable`
function to call function to call
""" """
pass if _func is not None:
for element in self:
_func(element)
return self
def group_by(self, _func: Callable = None) -> 'QueryableABC':
r"""Groups by func
Returns
-------
Grouped list[list[any]]: any
"""
if _func is None:
_func = _default_lambda
groups = {}
for v in self:
value = _func(v)
if v not in groups:
groups[value] = []
groups[value].append(v)
v = []
for g in groups.values():
v.append(type(self)(None, g))
x = type(self)(type(self), v)
return x
@abstractmethod
def last(self) -> any: def last(self) -> any:
r"""Returns last element r"""Returns last element
@ -163,9 +236,11 @@ class QueryableABC(ABC):
------- -------
Last element of list: any Last element of list: any
""" """
pass if len(self) == 0:
raise IndexOutOfRangeException()
return self[len(self) - 1]
@abstractmethod
def last_or_default(self) -> any: def last_or_default(self) -> any:
r"""Returns last element or None r"""Returns last element or None
@ -173,9 +248,11 @@ class QueryableABC(ABC):
------- -------
Last element of list: Optional[any] Last element of list: Optional[any]
""" """
pass if len(self) == 0:
return None
return self[len(self) - 1]
@abstractmethod
def max(self, _func: Callable = None) -> Union[int, float, complex]: def max(self, _func: Callable = None) -> Union[int, float, complex]:
r"""Returns the highest value r"""Returns the highest value
@ -188,19 +265,33 @@ class QueryableABC(ABC):
------- -------
Union[int, float, complex] Union[int, float, complex]
""" """
pass if _func is None and not is_number(self.type):
raise InvalidTypeException()
@abstractmethod if _func is None:
def median(self) -> Union[int, float]: _func = _default_lambda
return _func(max(self, key=_func))
def median(self, _func=None) -> Union[int, float]:
r"""Return the median value of data elements r"""Return the median value of data elements
Returns Returns
------- -------
Union[int, float] Union[int, float]
""" """
pass if _func is None:
_func = _default_lambda
result = self.order_by(_func).select(_func).to_list()
length = len(result)
i = int(length / 2)
return (
result[i]
if length % 2 == 1
else (float(result[i - 1]) + float(result[i])) / float(2)
)
@abstractmethod
def min(self, _func: Callable = None) -> Union[int, float, complex]: def min(self, _func: Callable = None) -> Union[int, float, complex]:
r"""Returns the lowest value r"""Returns the lowest value
@ -213,9 +304,14 @@ class QueryableABC(ABC):
------- -------
Union[int, float, complex] Union[int, float, complex]
""" """
pass if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(min(self, key=_func))
@abstractmethod
def order_by(self, _func: Callable = None) -> 'QueryableABC': def order_by(self, _func: Callable = None) -> 'QueryableABC':
r"""Sorts elements by function in ascending order r"""Sorts elements by function in ascending order
@ -228,9 +324,12 @@ class QueryableABC(ABC):
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _func is None:
_func = _default_lambda
from cpl_query.base.ordered_queryable import OrderedQueryable
return OrderedQueryable(self.type, sorted(self, key=_func), _func)
@abstractmethod
def order_by_descending(self, _func: Callable = None) -> 'QueryableABC': def order_by_descending(self, _func: Callable = None) -> 'QueryableABC':
r"""Sorts elements by function in descending order r"""Sorts elements by function in descending order
@ -243,9 +342,12 @@ class QueryableABC(ABC):
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _func is None:
_func = _default_lambda
from cpl_query.base.ordered_queryable import OrderedQueryable
return OrderedQueryable(self.type, sorted(self, key=_func, reverse=True), _func)
@abstractmethod
def reverse(self) -> 'QueryableABC': def reverse(self) -> 'QueryableABC':
r"""Reverses list r"""Reverses list
@ -253,29 +355,31 @@ class QueryableABC(ABC):
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass return type(self)(self._type, list(reversed(self)))
@abstractmethod def select(self, _func: Callable) -> 'QueryableABC':
def select(self, _f: Callable) -> 'QueryableABC':
r"""Formats each element of list to a given format r"""Formats each element of list to a given format
Returns Returns
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _func is None:
_func = _default_lambda
@abstractmethod return type(self)(any, [_func(_o) for _o in self])
def select_many(self, _f: Callable) -> 'QueryableABC':
def select_many(self, _func: Callable) -> 'QueryableABC':
r"""Flattens resulting lists to one r"""Flattens resulting lists to one
Returns Returns
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass # The line below is pain. I don't understand anything of it...
# written on 09.11.2022 by Sven Heidemann
return type(self)(any, [_a for _o in self for _a in _func(_o)])
@abstractmethod
def single(self) -> any: def single(self) -> any:
r"""Returns one single element of list r"""Returns one single element of list
@ -288,9 +392,13 @@ class QueryableABC(ABC):
ArgumentNoneException: when argument is None ArgumentNoneException: when argument is None
Exception: when argument is None or found more than one element Exception: when argument is None or found more than one element
""" """
pass if len(self) > 1:
raise Exception('Found more than one element')
elif len(self) == 0:
raise Exception('Found no element')
return self[0]
@abstractmethod
def single_or_default(self) -> Optional[any]: def single_or_default(self) -> Optional[any]:
r"""Returns one single element of list r"""Returns one single element of list
@ -298,39 +406,48 @@ class QueryableABC(ABC):
------- -------
Found value: Optional[any] Found value: Optional[any]
""" """
pass if len(self) > 1:
raise Exception('Index out of range')
elif len(self) == 0:
return None
@abstractmethod return self[0]
def skip(self, index: int) -> 'QueryableABC':
def skip(self, _index: int) -> 'QueryableABC':
r"""Skips all elements from index r"""Skips all elements from index
Parameter Parameter
--------- ---------
index: :class:`int` _index: :class:`int`
index index
Returns Returns
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
@abstractmethod return type(self)(self.type, values=self[_index:])
def skip_last(self, index: int) -> 'QueryableABC':
def skip_last(self, _index: int) -> 'QueryableABC':
r"""Skips all elements after index r"""Skips all elements after index
Parameter Parameter
--------- ---------
index: :class:`int` _index: :class:`int`
index index
Returns Returns
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(self) - _index
return type(self)(self._type, self[:index])
@abstractmethod
def sum(self, _func: Callable = None) -> Union[int, float, complex]: def sum(self, _func: Callable = None) -> Union[int, float, complex]:
r"""Sum of all values r"""Sum of all values
@ -343,39 +460,54 @@ class QueryableABC(ABC):
------- -------
Union[int, float, complex] Union[int, float, complex]
""" """
pass if _func is None and not is_number(self.type):
raise InvalidTypeException()
@abstractmethod if _func is None:
def take(self, index: int) -> 'QueryableABC': _func = _default_lambda
result = 0
for x in self:
result += _func(x)
return result
def take(self, _index: int) -> 'QueryableABC':
r"""Takes all elements from index r"""Takes all elements from index
Parameter Parameter
--------- ---------
index: :class:`int` _index: :class:`int`
index index
Returns Returns
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
@abstractmethod return type(self)(self._type, self[:_index])
def take_last(self, index: int) -> 'QueryableABC':
def take_last(self, _index: int) -> 'QueryableABC':
r"""Takes all elements after index r"""Takes all elements after index
Parameter Parameter
--------- ---------
index: :class:`int` _index: :class:`int`
index index
Returns Returns
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass index = len(self) - _index
if index >= len(self) or index < 0:
raise IndexOutOfRangeException()
return type(self)(self._type, self[index:])
@abstractmethod
def where(self, _func: Callable = None) -> 'QueryableABC': def where(self, _func: Callable = None) -> 'QueryableABC':
r"""Select element by function r"""Select element by function
@ -388,4 +520,15 @@ class QueryableABC(ABC):
------- -------
:class: `cpl_query.base.queryable_abc.QueryableABC` :class: `cpl_query.base.queryable_abc.QueryableABC`
""" """
pass if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
if _func is None:
_func = _default_lambda
result = []
for element in self:
if _func(element):
result.append(element)
return type(self)(self.type, result)

View File

@ -0,0 +1,86 @@
from abc import ABC, abstractmethod
from itertools import islice
from cpl_query.base.sequence_values import SequenceValues
class SequenceABC(ABC):
@abstractmethod
def __init__(self, t: type = None, values: list = None):
ABC.__init__(self)
if values is None:
values = []
if t is None and len(values) > 0:
t = type(values[0])
if t is None:
t = any
self._type = t
self._set_values(values)
def __len__(self):
return len(self._values)
def __iter__(self):
return iter(self._values)
def next(self):
return next(self._values)
def __next__(self):
return self.next()
def __getitem__(self, n):
values = [x for x in self]
if isinstance(n, slice):
try:
return values[n]
except Exception as e:
raise e
for i in range(len(values)):
if i == n:
return values[i]
def __repr__(self):
return f'<{type(self).__name__} {list(self).__repr__()}>'
@property
def type(self) -> type:
return self._type
def _check_type(self, __object: any):
if self._type == any:
return
if self._type is not None and type(__object) != self._type and not isinstance(type(__object), self._type) and not issubclass(type(__object), self._type):
raise Exception(f'Unexpected type: {type(__object)}\nExpected type: {self._type}')
def _set_values(self, values: list):
self._values = SequenceValues(values, self._type)
def to_list(self) -> list:
r"""Converts :class: `cpl_query.base.sequence_abc.SequenceABC` to :class: `list`
Returns
-------
:class: `list`
"""
return [x for x in self]
@classmethod
def empty(cls) -> 'SequenceABC':
r"""Returns an empty sequence
Returns
-------
Sequence object that contains no elements
"""
return cls()
@classmethod
def range(cls, start: int, length: int) -> 'SequenceABC':
return cls(int, list(range(start, length)))

View File

@ -5,15 +5,13 @@ from cpl_query.exceptions import IndexOutOfRangeException
class SequenceValues: class SequenceValues:
def __init__(self, data, _t: type): def __init__(self, data: list, _t: type):
if data is None:
data = []
if len(data) > 0: if len(data) > 0:
def type_check(_t: type, _l: list): def type_check(_t: type, _l: list):
return all(isinstance(x, _t) for x in _l) return all([_t == any or isinstance(x, _t) for x in _l])
if not type_check(_t, data): if not type_check(_t, data):
print([type(x) for x in data])
raise Exception(f'Unexpected type\nExpected type: {_t}') raise Exception(f'Unexpected type\nExpected type: {_t}')
if not hasattr(data, '__iter__'): if not hasattr(data, '__iter__'):
@ -30,7 +28,7 @@ class SequenceValues:
def __iter__(self): def __iter__(self):
i = 0 i = 0
while i < len(self): while i < self._len():
yield next(self._cycle) yield next(self._cycle)
i += 1 i += 1

View File

@ -19,12 +19,9 @@ __version__ = '2022.10.0.post2'
from collections import namedtuple from collections import namedtuple
# imports: # imports:
from .enumerable import Enumerable from .enumerable import Enumerable
from .enumerable_abc import EnumerableABC from .enumerable_abc import EnumerableABC
from .ordered_enumerable import OrderedEnumerable
from .ordered_enumerable_abc import OrderedEnumerableABC
VersionInfo = namedtuple('VersionInfo', 'major minor micro') VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='2022', minor='10', micro='0.post2') version_info = VersionInfo(major='2022', minor='10', micro='0.post2')

View File

@ -1,9 +1,4 @@
from typing import Union, Callable, Optional
from cpl_query._helper import is_number
from cpl_query.enumerable.enumerable_abc import EnumerableABC from cpl_query.enumerable.enumerable_abc import EnumerableABC
from cpl_query.enumerable.ordered_enumerable_abc import OrderedEnumerableABC
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument, InvalidTypeException, IndexOutOfRangeException
def _default_lambda(x: object): def _default_lambda(x: object):
@ -14,294 +9,5 @@ class Enumerable(EnumerableABC):
r"""Implementation of :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC` r"""Implementation of :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC`
""" """
def __init__(self, t: type = None, values: Union[list, iter] = None): def __init__(self, t: type = None, values: list = None):
EnumerableABC.__init__(self, t, values) EnumerableABC.__init__(self, t, values)
def all(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = self.where(_func)
return len(result) == len(self)
def any(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = self.where(_func)
return len(result) > 0
def average(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return float(self.sum(_func)) / float(self.count())
def contains(self, _value: object) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _value is None:
raise ArgumentNoneException(ExceptionArgument.value)
return self.where(lambda x: x == _value).count() > 0
def count(self, _func: Callable = None) -> int:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
return len(self)
return len(self.where(_func))
def distinct(self, _func: Callable = None) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = Enumerable()
known_values = []
for element in self:
value = _func(element)
if value in known_values:
continue
known_values.append(value)
result.add(element)
return result
def element_at(self, _index: int) -> any:
self._values.reset()
while _index >= 0:
current = self.next()
if _index == 0:
return current
_index -= 1
def element_at_or_default(self, _index: int) -> any:
try:
return self.element_at(_index)
except IndexOutOfRangeException:
return None
@staticmethod
def empty() -> 'EnumerableABC':
r"""Returns an empty enumerable
Returns
-------
Enumerable object that contains no elements
"""
return Enumerable()
def first(self: EnumerableABC, _func=None) -> any:
if _func is not None:
return self.where(_func).element_at(0)
return self.element_at(0)
def first_or_default(self: EnumerableABC, _func=None) -> Optional[any]:
if _func is not None:
return self.where(_func).element_at_or_default(0)
return self.element_at_or_default(0)
def for_each(self, _func: Callable = None):
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
for element in self:
_func(element)
def last(self: EnumerableABC) -> any:
return self.element_at(self.count() - 1)
def last_or_default(self: EnumerableABC) -> Optional[any]:
return self.element_at_or_default(self.count() - 1)
def max(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(max(self, key=_func))
def median(self, _func=None) -> Union[int, float]:
if _func is None:
_func = _default_lambda
result = self.order_by(_func).select(_func).to_list()
length = len(result)
i = int(length / 2)
return (
result[i]
if length % 2 == 1
else (float(result[i - 1]) + float(result[i])) / float(2)
)
def min(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(min(self, key=_func))
def order_by(self, _func: Callable = None) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
from cpl_query.enumerable.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self.type, _func, sorted(self, key=_func))
def order_by_descending(self, _func: Callable = None) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
from cpl_query.enumerable.ordered_enumerable import OrderedEnumerable
return OrderedEnumerable(self.type, _func, sorted(self, key=_func, reverse=True))
@staticmethod
def range(start: int, length: int) -> 'EnumerableABC':
return Enumerable(int, range(start, length))
def reverse(self: EnumerableABC) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
return Enumerable(self.type, list(reversed(self.to_list())))
def select(self, _func: Callable = None) -> EnumerableABC:
if _func is None:
_func = _default_lambda
_l = [_func(_o) for _o in self]
return Enumerable(self._type if len(_l) < 1 else type(_l[0]), _l)
def select_many(self, _func: Callable = None) -> EnumerableABC:
if _func is None:
_func = _default_lambda
# The line below is pain. I don't understand anything of the list comprehension...
# written on 09.11.2022 by Sven Heidemann
_l = [_a for _o in self for _a in _func(_o)]
return Enumerable(self._type if len(_l) < 1 else type(_l[0]), _l)
def single(self: EnumerableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) > 1:
raise IndexError('Found more than one element')
elif len(self) == 0:
raise IndexOutOfRangeException(f'{type(self).__name__} is empty')
return self.element_at(0)
def single_or_default(self: EnumerableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) > 1:
raise IndexError('Found more than one element')
elif len(self) == 0:
return None
return self.element_at(0)
def skip(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
_list = self.to_list()
return Enumerable(self.type, _list[_index:])
def skip_last(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(self) - _index
return self.take(len(self) - _index)
def take(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
_list = self.to_list()
return Enumerable(self.type, _list[:_index])
def take_last(self, _index: int) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
_list = self.to_list()
index = len(_list) - _index
return self.skip(index)
def sum(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return sum([_func(x) for x in self])
def where(self, _func: Callable = None) -> EnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
return Enumerable(self.type, list(filter(_func, self._values)))

View File

@ -1,8 +1,6 @@
from abc import abstractmethod from abc import abstractmethod
from typing import Iterable
from cpl_query.base.queryable_abc import QueryableABC from cpl_query.base.queryable_abc import QueryableABC
from cpl_query.base.sequence_values import SequenceValues
class EnumerableABC(QueryableABC): class EnumerableABC(QueryableABC):
@ -11,81 +9,15 @@ class EnumerableABC(QueryableABC):
@abstractmethod @abstractmethod
def __init__(self, t: type = None, values: list = None): def __init__(self, t: type = None, values: list = None):
if t == any or t is None and values is not None: QueryableABC.__init__(self, t, values)
t = type(values[0])
self._type, self._values, self._remove_error_check = t, SequenceValues(values, t), True self._remove_error_check = True
def __len__(self):
return len(self._values)
def __iter__(self):
return iter(self._values)
def next(self):
return next(self._values)
def __next__(self):
return self.next()
def __repr__(self):
return f'<{type(self).__name__} {list(self).__repr__()}>'
@property
def type(self) -> type:
return self._type
def set_remove_error_check(self, _value: bool): def set_remove_error_check(self, _value: bool):
r"""Set flag to check if element exists before removing r"""Set flag to check if element exists before removing
""" """
self._remove_error_check = _value self._remove_error_check = _value
def add(self, __object: object) -> None:
r"""Adds an element to the enumerable.
"""
if self._type is not None and type(__object) != self._type and not isinstance(type(__object), self._type) and not issubclass(type(__object), self._type):
raise Exception(f'Unexpected type: {type(__object)}\nExpected type: {self._type}')
if len(self) == 0 and self._type is None:
self._type = type(__object)
self._values = SequenceValues([*self._values, __object], self._type)
def clear(self):
r"""Removes all elements
"""
del self._values
self._values = []
def extend(self, __list: Iterable) -> 'EnumerableABC':
r"""Adds elements of given list to enumerable
Parameter
---------
__enumerable: :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC`
index
"""
self._values = SequenceValues([*self._values, *__list], self._type)
return self
def remove(self, __object: object) -> None:
r"""Removes element from list
Parameter
---------
__object: :class:`object`
value
Raises
---------
`Element not found` when element does not exist. Check can be deactivated by calling <enumerable>.set_remove_error_check(False)
"""
if self._remove_error_check and __object not in self._values:
raise Exception('Element not found')
# self._values.remove(__object)
self._values = SequenceValues([x for x in self.to_list() if x != __object], self._type)
def to_iterable(self) -> 'IterableABC': def to_iterable(self) -> 'IterableABC':
r"""Converts :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC` to :class: `cpl_query.iterable.iterable_abc.IterableABC` r"""Converts :class: `cpl_query.enumerable.enumerable_abc.EnumerableABC` to :class: `cpl_query.iterable.iterable_abc.IterableABC`
@ -95,12 +27,3 @@ class EnumerableABC(QueryableABC):
""" """
from cpl_query.iterable.iterable import Iterable from cpl_query.iterable.iterable import Iterable
return Iterable(self._type, self.to_list()) return Iterable(self._type, self.to_list())
def to_list(self) -> list:
r"""Converts :class: `cpl_query.base.sequence_abc.SequenceABC` to :class: `list`
Returns
-------
:class: `list`
"""
return [x for x in self]

View File

@ -1,36 +0,0 @@
from collections.abc import Callable
from typing import Iterable
from cpl_query.enumerable.enumerable import Enumerable
from cpl_query.enumerable.ordered_enumerable_abc import OrderedEnumerableABC
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument
class OrderedEnumerable(Enumerable, OrderedEnumerableABC):
r"""Implementation of :class: `cpl_query.extension.Enumerable` `cpl_query.extension.OrderedEnumerableABC`
"""
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None):
Enumerable.__init__(self, _t)
OrderedEnumerableABC.__init__(self, _t, _func, _values)
def then_by(self: OrderedEnumerableABC, _func: Callable) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedEnumerable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs]))
def then_by_descending(self: OrderedEnumerableABC, _func: Callable) -> OrderedEnumerableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedEnumerable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs], reverse=True))

View File

@ -1,43 +0,0 @@
from abc import abstractmethod
from collections.abc import Callable
from typing import Iterable
from cpl_query.enumerable.enumerable_abc import EnumerableABC
class OrderedEnumerableABC(EnumerableABC):
@abstractmethod
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None):
EnumerableABC.__init__(self, _t, _values)
self._funcs: list[Callable] = []
if _func is not None:
self._funcs.append(_func)
@abstractmethod
def then_by(self, func: Callable) -> 'OrderedEnumerableABC':
r"""Sorts OrderedList in ascending order by function
Parameter
---------
func: :class:`Callable`
Returns
-------
list of :class:`cpl_query.extension.OrderedEnumerableABC`
"""
pass
@abstractmethod
def then_by_descending(self, func: Callable) -> 'OrderedEnumerableABC':
r"""Sorts OrderedList in descending order by function
Parameter
---------
func: :class:`Callable`
Returns
-------
list of :class:`cpl_query.extension.OrderedEnumerableABC`
"""
pass

View File

@ -23,8 +23,6 @@ from collections import namedtuple
# imports: # imports:
from .iterable_abc import IterableABC from .iterable_abc import IterableABC
from .iterable import Iterable from .iterable import Iterable
from .ordered_iterable_abc import OrderedIterableABC
from .ordered_iterable import OrderedIterable
VersionInfo = namedtuple('VersionInfo', 'major minor micro') VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='2022', minor='10', micro='0.post2') version_info = VersionInfo(major='2022', minor='10', micro='0.post2')

View File

@ -1,9 +1,6 @@
from typing import Callable, Optional, Union, Iterable as IterableType from typing import Iterable as IterableType
from cpl_query._helper import is_number
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument, InvalidTypeException, IndexOutOfRangeException
from cpl_query.iterable.iterable_abc import IterableABC from cpl_query.iterable.iterable_abc import IterableABC
from cpl_query.iterable.ordered_iterable_abc import OrderedIterableABC
def _default_lambda(x: object): def _default_lambda(x: object):
@ -14,318 +11,3 @@ class Iterable(IterableABC):
def __init__(self, t: type = None, values: IterableType = None): def __init__(self, t: type = None, values: IterableType = None):
IterableABC.__init__(self, t, values) IterableABC.__init__(self, t, values)
def all(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = self.where(_func)
return len(result) == len(self)
def any(self, _func: Callable = None) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = self.where(_func)
return len(result) > 0
def average(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return float(self.sum(_func)) / float(self.count())
def contains(self, _value: object) -> bool:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _value is None:
raise ArgumentNoneException(ExceptionArgument.value)
return self.where(lambda x: x == _value).count() > 0
def count(self, _func: Callable = None) -> int:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
return len(self)
return len(self.where(_func))
def distinct(self, _func: Callable = None) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
result = Iterable()
known_values = []
for element in self:
value = _func(element)
if value in known_values:
continue
known_values.append(value)
result.append(element)
return result
def element_at(self, _index: int) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
return self[_index]
def element_at_or_default(self, _index: int) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
try:
return self[_index]
except IndexError:
return None
def first(self: IterableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
raise IndexOutOfRangeException()
return self[0]
def first_or_default(self: IterableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
return None
return self[0]
def last(self: IterableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
raise IndexOutOfRangeException()
return self[len(self) - 1]
def last_or_default(self: IterableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) == 0:
return None
return self[len(self) - 1]
def for_each(self, _func: Callable = None):
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
for element in self:
_func(element)
def max(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(max(self, key=_func))
def median(self, _func=None) -> Union[int, float]:
if _func is None:
_func = _default_lambda
result = self.order_by(_func).select(_func).to_list()
length = len(result)
i = int(length / 2)
return (
result[i]
if length % 2 == 1
else (float(result[i - 1]) + float(result[i])) / float(2)
)
def min(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return _func(min(self, key=_func))
def order_by(self, _func: Callable = None) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
from cpl_query.iterable.ordered_iterable import OrderedIterable
return OrderedIterable(self.type, _func, sorted(self, key=_func))
def order_by_descending(self, _func: Callable = None) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
_func = _default_lambda
from cpl_query.iterable.ordered_iterable import OrderedIterable
return OrderedIterable(self.type, _func, sorted(self, key=_func, reverse=True))
def reverse(self: IterableABC) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
return Iterable().extend(reversed(self.to_list()))
def select(self, _func: Callable = None) -> IterableABC:
if _func is None:
_func = _default_lambda
result = Iterable()
result.extend(_func(_o) for _o in self)
return result
def select_many(self, _func: Callable = None) -> IterableABC:
if _func is None:
_func = _default_lambda
result = Iterable()
# The line below is pain. I don't understand anything of it...
# written on 09.11.2022 by Sven Heidemann
elements = [_a for _o in self for _a in _func(_o)]
result.extend(elements)
return result
def single(self: IterableABC) -> any:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) > 1:
raise Exception('Found more than one element')
elif len(self) == 0:
raise Exception('Found no element')
return self[0]
def single_or_default(self: IterableABC) -> Optional[any]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if len(self) > 1:
raise Exception('Index out of range')
elif len(self) == 0:
return None
return self[0]
def skip(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
return Iterable(self.type, values=self[_index:])
def skip_last(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
index = len(self) - _index
result = Iterable()
result.extend(self[:index])
return result
def take(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _index is None:
raise ArgumentNoneException(ExceptionArgument.index)
result = Iterable()
result.extend(self[:_index])
return result
def take_last(self, _index: int) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
index = len(self) - _index
if index >= len(self) or index < 0:
raise IndexOutOfRangeException()
result = Iterable()
result.extend(self[index:])
return result
def sum(self, _func: Callable = None) -> Union[int, float, complex]:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None and not is_number(self.type):
raise InvalidTypeException()
if _func is None:
_func = _default_lambda
return sum([_func(x) for x in self])
def where(self, _func: Callable = None) -> IterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
if _func is None:
_func = _default_lambda
result = Iterable(self.type)
for element in self:
if _func(element):
result.append(element)
return result

View File

@ -4,54 +4,45 @@ from typing import Iterable
from cpl_query.base.queryable_abc import QueryableABC from cpl_query.base.queryable_abc import QueryableABC
class IterableABC(list, QueryableABC): class IterableABC(QueryableABC):
r"""ABC to define functions on list r"""ABC to define functions on list
""" """
@abstractmethod @abstractmethod
def __init__(self, t: type = None, values: Iterable = None): def __init__(self, t: type = None, values: Iterable = None):
values = [] if values is None else values QueryableABC.__init__(self, t, values)
list.__init__(self, values)
if t is None and len(values) > 0: def __setitem__(self, i, val):
t = type(values[0]) self._check_type(val)
values = [*self._values]
values[i] = val
self._set_values(values)
self._type = t def __delitem__(self, i):
values = [*self._values]
def __repr__(self): del values[i]
return f'<{type(self).__name__} {list(self).__repr__()}>' self._set_values(values)
@property @property
def type(self) -> type: def type(self) -> type:
return self._type return self._type
def to_list(self) -> list:
r"""Converts :class: `cpl_query.base.sequence_abc.SequenceABC` to :class: `list`
Returns
-------
:class: `list`
"""
return [x for x in self]
def __str__(self): def __str__(self):
return str(self.to_list()) return str(self.to_list())
def append(self, __object: object) -> None: def append(self, _object: object):
self.add(_object)
def add(self, _object: object):
r"""Adds element to list r"""Adds element to list
Parameter Parameter
--------- ---------
__object: :class:`object` _object: :class:`object`
value value
""" """
if self._type is not None and type(__object) != self._type and not isinstance(type(__object), self._type) and not issubclass(type(__object), self._type): self._check_type(_object)
raise Exception(f'Unexpected type: {type(__object)}\nExpected type: {self._type}') values = [*self._values, _object]
self._set_values(values)
if len(self) == 0 and self._type is None:
self._type = type(__object)
# self._values = SequenceValues([*self._values, __object], self._type)
super().append(__object)
def extend(self, __iterable: Iterable) -> 'IterableABC': def extend(self, __iterable: Iterable) -> 'IterableABC':
r"""Adds elements of given list to list r"""Adds elements of given list to list

View File

@ -1,35 +0,0 @@
from collections.abc import Callable
from cpl_query.exceptions import ArgumentNoneException, ExceptionArgument
from cpl_query.iterable.iterable import Iterable
from cpl_query.iterable.ordered_iterable_abc import OrderedIterableABC
class OrderedIterable(Iterable, OrderedIterableABC):
r"""Implementation of :class: `cpl_query.extension.Iterable` `cpl_query.extension.OrderedIterableABC`
"""
def __init__(self, _t: type, _func: Callable = None, _values: Iterable = None):
Iterable.__init__(self, _t)
OrderedIterableABC.__init__(self, _t, _func, _values)
def then_by(self: OrderedIterableABC, _func: Callable) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedIterable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs]))
def then_by_descending(self: OrderedIterableABC, _func: Callable) -> OrderedIterableABC:
if self is None:
raise ArgumentNoneException(ExceptionArgument.list)
if _func is None:
raise ArgumentNoneException(ExceptionArgument.func)
self._funcs.append(_func)
return OrderedIterable(self.type, _func, sorted(self, key=lambda *args: [f(*args) for f in self._funcs], reverse=True))

View File

View File

@ -0,0 +1,35 @@
import os
import shutil
import traceback
import unittest
from unittests_cli.constants import PLAYGROUND_PATH
class CommandTestCase(unittest.TestCase):
def __init__(self, method_name: str):
unittest.TestCase.__init__(self, method_name)
@classmethod
def setUpClass(cls):
try:
if os.path.exists(PLAYGROUND_PATH):
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH)))
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
except Exception as e:
print(f'Setup of {__name__} failed: {traceback.format_exc()}')
def setUp(self):
os.chdir(PLAYGROUND_PATH)
@classmethod
def tearDownClass(cls):
try:
if os.path.exists(PLAYGROUND_PATH):
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH)))
except Exception as e:
print(f'Cleanup of {__name__} failed: {traceback.format_exc()}')

View File

@ -1,18 +1,16 @@
import json import json
import os import os
import shutil
import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class AddTestCase(unittest.TestCase): class AddTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'add-test-project' self._source = 'add-test-project'
self._target = 'add-test-library' self._target = 'add-test-library'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
@ -26,19 +24,12 @@ class AddTestCase(unittest.TestCase):
return project_json return project_json
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))
CLICommands.new('console', self._target, '--ab', '--s') CLICommands.new('console', self._target, '--ab', '--s')
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def test_add(self): def test_add(self):
CLICommands.add(self._source, self._target) CLICommands.add(self._source, self._target)
settings = self._get_project_settings() settings = self._get_project_settings()

View File

@ -2,18 +2,17 @@ import filecmp
import json import json
import os import os
import shutil import shutil
import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class BuildTestCase(unittest.TestCase): class BuildTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'build-test-source' self._source = 'build-test-source'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
@ -31,18 +30,14 @@ class BuildTestCase(unittest.TestCase):
project_file.close() project_file.close()
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) if not os.path.exists(PLAYGROUND_PATH):
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def _are_dir_trees_equal(self, dir1, dir2): def _are_dir_trees_equal(self, dir1, dir2):
""" """
found at https://stackoverflow.com/questions/4187564/recursively-compare-two-directories-to-ensure-they-have-the-same-files-and-subdi found at https://stackoverflow.com/questions/4187564/recursively-compare-two-directories-to-ensure-they-have-the-same-files-and-subdi

View File

@ -75,4 +75,9 @@ class CLITestSuite(unittest.TestSuite):
def run(self, *args): def run(self, *args):
self._setup() self._setup()
self._result = super().run(*args) self._result = super().run(*args)
# self._cleanup() self._cleanup()
if __name__ == "__main__":
runner = unittest.TextTestRunner()
runner.run(CLITestSuite())

View File

@ -1,5 +1,9 @@
import os import os
PLAYGROUND_PATH = os.path.abspath(os.path.join(os.getcwd(), '../test_cli_playground')) base = ''
TRANSLATION_PATH = os.path.abspath(os.path.join(os.getcwd(), '../unittests_translation')) if not os.getcwd().endswith('unittests'):
CLI_PATH = os.path.abspath(os.path.join(os.getcwd(), '../../src/cpl_cli/main.py')) base = '../'
PLAYGROUND_PATH = os.path.abspath(os.path.join(os.getcwd(), f'{base}test_cli_playground'))
TRANSLATION_PATH = os.path.abspath(os.path.join(os.getcwd(), f'{base}unittests_translation'))
CLI_PATH = os.path.abspath(os.path.join(os.getcwd(), f'{base}../src/cpl_cli/main.py'))

View File

@ -1,7 +1,7 @@
import unittest from unittests_cli.abc.command_test_case import CommandTestCase
class CustomTestCase(unittest.TestCase): class CustomTestCase(CommandTestCase):
def setUp(self): def setUp(self):
pass pass

View File

@ -1,17 +1,18 @@
import os.path import os.path
import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class GenerateTestCase(unittest.TestCase): class GenerateTestCase(CommandTestCase):
_project = 'test-console' _project = 'test-console'
_t_path = 'test' _t_path = 'test'
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
CommandTestCase.setUpClass()
CLICommands.new('console', cls._project, '--ab', '--s', '--venv') CLICommands.new('console', cls._project, '--ab', '--s', '--venv')
def setUp(self): def setUp(self):

View File

@ -6,14 +6,15 @@ import sys
import unittest import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class InstallTestCase(unittest.TestCase): class InstallTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'install-test-source' self._source = 'install-test-source'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
@ -31,18 +32,14 @@ class InstallTestCase(unittest.TestCase):
project_file.close() project_file.close()
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) if not os.path.exists(PLAYGROUND_PATH):
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def _get_installed_packages(self) -> dict: def _get_installed_packages(self) -> dict:
reqs = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']) reqs = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'])
return dict([tuple(r.decode().split('==')) for r in reqs.split()]) return dict([tuple(r.decode().split('==')) for r in reqs.split()])

View File

@ -1,16 +1,16 @@
import json import json
import os import os
import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class NewTestCase(unittest.TestCase): class NewTestCase(CommandTestCase):
def setUp(self): def __init__(self, method_name: str):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) CommandTestCase.__init__(self, method_name)
def _test_project(self, project_type: str, name: str, *args, test_venv=False, without_ws=False): def _test_project(self, project_type: str, name: str, *args, test_venv=False, without_ws=False):
CLICommands.new(project_type, name, *args) CLICommands.new(project_type, name, *args)
@ -70,7 +70,6 @@ class NewTestCase(unittest.TestCase):
project_path = os.path.abspath(os.path.join(PLAYGROUND_PATH, workspace_name, base, String.convert_to_snake_case(name))) project_path = os.path.abspath(os.path.join(PLAYGROUND_PATH, workspace_name, base, String.convert_to_snake_case(name)))
self.assertTrue(os.path.exists(project_path)) self.assertTrue(os.path.exists(project_path))
self.assertTrue(os.path.join(project_path, f'{name}.json')) self.assertTrue(os.path.join(project_path, f'{name}.json'))
os.chdir(os.path.abspath(os.path.join(os.getcwd(), '../')))
def _test_sub_directory_project(self, project_type: str, directory: str, name: str, workspace_name: str, *args): def _test_sub_directory_project(self, project_type: str, directory: str, name: str, workspace_name: str, *args):
os.chdir(os.path.abspath(os.path.join(os.getcwd(), workspace_name))) os.chdir(os.path.abspath(os.path.join(os.getcwd(), workspace_name)))
@ -96,8 +95,6 @@ class NewTestCase(unittest.TestCase):
self.assertEqual(build_settings['Main'], f'{String.convert_to_snake_case(name)}.main') self.assertEqual(build_settings['Main'], f'{String.convert_to_snake_case(name)}.main')
self.assertEqual(build_settings['EntryPoint'], name) self.assertEqual(build_settings['EntryPoint'], name)
os.chdir(os.path.abspath(os.path.join(os.getcwd(), '../')))
def test_console(self): def test_console(self):
self._test_project('console', 'test-console', '--ab', '--s', '--venv', test_venv=True) self._test_project('console', 'test-console', '--ab', '--s', '--venv', test_venv=True)

View File

@ -5,15 +5,16 @@ import shutil
import unittest import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class PublishTestCase(unittest.TestCase): class PublishTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'publish-test-source' self._source = 'publish-test-source'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
@ -31,18 +32,14 @@ class PublishTestCase(unittest.TestCase):
project_file.close() project_file.close()
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) if not os.path.exists(PLAYGROUND_PATH):
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def _are_dir_trees_equal(self, dir1, dir2): def _are_dir_trees_equal(self, dir1, dir2):
""" """
found at https://stackoverflow.com/questions/4187564/recursively-compare-two-directories-to-ensure-they-have-the-same-files-and-subdi found at https://stackoverflow.com/questions/4187564/recursively-compare-two-directories-to-ensure-they-have-the-same-files-and-subdi

View File

@ -3,14 +3,15 @@ import os
import unittest import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class RemoveTestCase(unittest.TestCase): class RemoveTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'add-test-project' self._source = 'add-test-project'
self._target = 'add-test-library' self._target = 'add-test-library'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
@ -24,7 +25,10 @@ class RemoveTestCase(unittest.TestCase):
return project_json return project_json
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) if not os.path.exists(PLAYGROUND_PATH):
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))

View File

@ -1,22 +1,18 @@
import json import json
import os import os
import shutil import shutil
import subprocess
import sys
import unittest import unittest
import pkg_resources
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class RunTestCase(unittest.TestCase): class RunTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'run-test' self._source = 'run-test'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
self._appsettings = f'src/{String.convert_to_snake_case(self._source)}/appsettings.json' self._appsettings = f'src/{String.convert_to_snake_case(self._source)}/appsettings.json'
@ -50,7 +46,10 @@ class RunTestCase(unittest.TestCase):
project_file.close() project_file.close()
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) if not os.path.exists(PLAYGROUND_PATH):
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))
@ -60,13 +59,6 @@ class RunTestCase(unittest.TestCase):
file.write(f'\t\t{self._test_code}') file.write(f'\t\t{self._test_code}')
file.close() file.close()
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def test_run(self): def test_run(self):
CLICommands.run() CLICommands.run()
settings = self._get_appsettings() settings = self._get_appsettings()

View File

@ -5,15 +5,16 @@ import time
import unittest import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_cli.threads.start_test_thread import StartTestThread from unittests_cli.threads.start_test_thread import StartTestThread
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class StartTestCase(unittest.TestCase): class StartTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'start-test' self._source = 'start-test'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
self._appsettings = f'src/{String.convert_to_snake_case(self._source)}/appsettings.json' self._appsettings = f'src/{String.convert_to_snake_case(self._source)}/appsettings.json'
@ -50,7 +51,10 @@ class StartTestCase(unittest.TestCase):
project_file.close() project_file.close()
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) if not os.path.exists(PLAYGROUND_PATH):
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))
@ -60,13 +64,6 @@ class StartTestCase(unittest.TestCase):
file.write(f'\t\t{self._test_code}') file.write(f'\t\t{self._test_code}')
file.close() file.close()
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def test_start(self): def test_start(self):
thread = StartTestThread() thread = StartTestThread()
thread.start() thread.start()

View File

@ -6,14 +6,15 @@ import sys
import unittest import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class UninstallTestCase(unittest.TestCase): class UninstallTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'uninstall-test-source' self._source = 'uninstall-test-source'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
self._version = '1.7.3' self._version = '1.7.3'
@ -29,18 +30,14 @@ class UninstallTestCase(unittest.TestCase):
return project_json return project_json
def setUp(self): def setUp(self):
os.chdir(os.path.abspath(PLAYGROUND_PATH)) if not os.path.exists(PLAYGROUND_PATH):
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
# create projects # create projects
CLICommands.new('console', self._source, '--ab', '--s') CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source)) os.chdir(os.path.join(os.getcwd(), self._source))
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def _get_installed_packages(self) -> dict: def _get_installed_packages(self) -> dict:
reqs = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']) reqs = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'])
return dict([tuple(r.decode().split('==')) for r in reqs.split()]) return dict([tuple(r.decode().split('==')) for r in reqs.split()])

View File

@ -6,14 +6,15 @@ import sys
import unittest import unittest
from cpl_core.utils import String from cpl_core.utils import String
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_cli.constants import PLAYGROUND_PATH from unittests_cli.constants import PLAYGROUND_PATH
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class UpdateTestCase(unittest.TestCase): class UpdateTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._source = 'install-test-source' self._source = 'install-test-source'
self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json' self._project_file = f'src/{String.convert_to_snake_case(self._source)}/{self._source}.json'
@ -26,6 +27,15 @@ class UpdateTestCase(unittest.TestCase):
self._new_package_name = 'discord.py' self._new_package_name = 'discord.py'
self._new_package = f'{self._new_package_name}=={self._new_version}' self._new_package = f'{self._new_package_name}=={self._new_version}'
def setUp(self):
CLICommands.uninstall(self._old_package)
CLICommands.uninstall(self._new_package)
os.chdir(PLAYGROUND_PATH)
# create projects
CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source))
CLICommands.install(self._old_package)
def _get_project_settings(self): def _get_project_settings(self):
with open(os.path.join(os.getcwd(), self._project_file), 'r', encoding='utf-8') as cfg: with open(os.path.join(os.getcwd(), self._project_file), 'r', encoding='utf-8') as cfg:
# load json # load json
@ -39,22 +49,6 @@ class UpdateTestCase(unittest.TestCase):
project_file.write(json.dumps(settings, indent=2)) project_file.write(json.dumps(settings, indent=2))
project_file.close() project_file.close()
def setUp(self):
CLICommands.uninstall(self._old_package)
CLICommands.uninstall(self._new_package)
os.chdir(os.path.abspath(PLAYGROUND_PATH))
# create projects
CLICommands.new('console', self._source, '--ab', '--s')
os.chdir(os.path.join(os.getcwd(), self._source))
CLICommands.install(self._old_package)
def cleanUp(self):
# remove projects
if not os.path.exists(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source))):
return
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH, self._source)))
def _get_installed_packages(self) -> dict: def _get_installed_packages(self) -> dict:
reqs = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']) reqs = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'])
return dict([tuple(r.decode().split('==')) for r in reqs.split()]) return dict([tuple(r.decode().split('==')) for r in reqs.split()])
@ -85,5 +79,3 @@ class UpdateTestCase(unittest.TestCase):
packages = self._get_installed_packages() packages = self._get_installed_packages()
self.assertIn(self._new_package_name, packages) self.assertIn(self._new_package_name, packages)
self.assertEqual(self._new_version, packages[self._new_package_name]) self.assertEqual(self._new_version, packages[self._new_package_name])

View File

@ -12,13 +12,14 @@ import cpl_cli
from cpl_core.console import ForegroundColorEnum from cpl_core.console import ForegroundColorEnum
from termcolor import colored from termcolor import colored
from unittests_cli.abc.command_test_case import CommandTestCase
from unittests_shared.cli_commands import CLICommands from unittests_shared.cli_commands import CLICommands
class VersionTestCase(unittest.TestCase): class VersionTestCase(CommandTestCase):
def __init__(self, methodName: str): def __init__(self, method_name: str):
unittest.TestCase.__init__(self, methodName) CommandTestCase.__init__(self, method_name)
self._block_banner = "" self._block_banner = ""
self._block_version = "" self._block_version = ""
self._block_package_header = "" self._block_package_header = ""

View File

@ -11,7 +11,16 @@ from unittests_query.models import User, Address
class EnumerableQueryTestCase(unittest.TestCase): class EnumerableQueryTestCase(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
self._tests = Enumerable(User) users = []
for i in range(0, 100):
users.append(User(
String.random_string(string.ascii_letters, 8).lower(),
Address(
String.random_string(string.ascii_letters, 10).lower(),
randint(1, 10)
)
))
self._t_user = User( self._t_user = User(
'Test user', 'Test user',
Address( Address(
@ -27,22 +36,10 @@ class EnumerableQueryTestCase(unittest.TestCase):
) )
) )
self._generate_test_data() users.append(self._t_user)
users.append(self._t_user2)
def _generate_test_data(self): self._tests = Enumerable(User, users)
for i in range(0, 100):
user = User(
String.random_string(string.ascii_letters, 8).lower(),
Address(
String.random_string(string.ascii_letters, 10).lower(),
randint(1, 10)
)
)
self._tests.add(user)
self._tests.add(self._t_user)
self._tests.add(self._t_user2)
def test_any(self): def test_any(self):
results = [] results = []

View File

@ -6,17 +6,9 @@ from cpl_query.enumerable.enumerable import Enumerable
class EnumerableTestCase(unittest.TestCase): class EnumerableTestCase(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
self._list = Enumerable(int) self._list = Enumerable(int, list(range(1, 4)))
def _clear(self):
self._list.clear()
self.assertEqual(self._list, [])
def test_append(self): def test_append(self):
self._list.add(1)
self._list.add(2)
self._list.add(3)
self.assertEqual(self._list.to_list(), [1, 2, 3]) self.assertEqual(self._list.to_list(), [1, 2, 3])
self.assertRaises(Exception, lambda v: self._list.add(v), '3') self.assertRaises(Exception, lambda v: self._list.add(v), '3')
@ -38,30 +30,7 @@ class EnumerableTestCase(unittest.TestCase):
n += 1 n += 1
def test_get(self): def test_get(self):
self._list.add(1)
self._list.add(2)
self._list.add(3)
self.assertEqual(self._list.element_at(2), [1, 2, 3][2]) self.assertEqual(self._list.element_at(2), [1, 2, 3][2])
def test_count(self): def test_count(self):
self._list.add(1)
self._list.add(2)
self._list.add(3)
self.assertEqual(self._list.count(), 3) self.assertEqual(self._list.count(), 3)
def test_remove(self):
old_values = self._list._values
self._list.add(1)
self.assertNotEqual(old_values, self._list._values)
self._list.add(2)
self._list.add(3)
self.assertEqual(self._list.to_list(), [1, 2, 3])
self.assertRaises(Exception, lambda v: self._list.add(v), '3')
old_values = self._list._values
self._list.remove(3)
self.assertNotEqual(old_values, self._list._values)
self.assertEqual(self._list.to_list(), [1, 2])
self.assertRaises(Exception, lambda v: self._list.add(v), '3')

View File

@ -5,6 +5,7 @@ from random import randint
from cpl_core.utils import String from cpl_core.utils import String
from cpl_query.exceptions import InvalidTypeException, ArgumentNoneException from cpl_query.exceptions import InvalidTypeException, ArgumentNoneException
from cpl_query.extension.list import List from cpl_query.extension.list import List
from cpl_query.iterable import Iterable
from unittests_query.models import User, Address from unittests_query.models import User, Address
@ -102,8 +103,18 @@ class IterableQueryTestCase(unittest.TestCase):
self.assertEqual(1, self._tests.count(lambda u: u == self._t_user)) self.assertEqual(1, self._tests.count(lambda u: u == self._t_user))
def test_distinct(self): def test_distinct(self):
res = self._tests.distinct(lambda u: u.address.nr).where(lambda u: u.address.nr == 5) res = self._tests.select(lambda u: u.address.nr).where(lambda a: a == 5).distinct()
self.assertEqual(1, len(res)) self.assertEqual(1, res.count())
addresses = []
for u in self._tests:
if u.address.nr in addresses:
continue
addresses.append(u.address.nr)
res2 = self._tests.distinct(lambda x: x.address.nr).select(lambda x: x.address.nr).to_list()
self.assertEqual(addresses, res2)
def test_element_at(self): def test_element_at(self):
index = randint(0, len(self._tests) - 1) index = randint(0, len(self._tests) - 1)
@ -168,6 +179,29 @@ class IterableQueryTestCase(unittest.TestCase):
self.assertEqual(res[0], s_res) self.assertEqual(res[0], s_res)
self.assertIsNone(sn_res) self.assertIsNone(sn_res)
def test_group_by(self):
def by_adr(u):
return u.address.nr
t = self._tests.select(by_adr).group_by()
res = self._tests.group_by(by_adr)
self.assertTrue(isinstance(res.first_or_default(), Iterable))
self.assertNotEqual(self._tests.count(), res.count())
self.assertEqual(self._tests.distinct(by_adr).count(), res.count())
elements = List(int)
groups = {}
for x in range(0, 1000):
v = randint(1, 100)
if v not in groups:
groups[v] = []
groups[v].append(v)
elements.append(v)
r1, r2 = list(groups.values()), elements.group_by().select(lambda l: l.to_list()).to_list()
self.assertEqual(r1, r2)
def test_for_each(self): def test_for_each(self):
users = [] users = []
self._tests.for_each(lambda user: ( self._tests.for_each(lambda user: (

View File

@ -4,9 +4,15 @@ class User:
self.name = name self.name = name
self.address = address self.address = address
def __repr__(self):
return f'<{type(self).__name__} {self.name} {self.address}>'
class Address: class Address:
def __init__(self, street, nr): def __init__(self, street, nr):
self.street = street self.street = street
self.nr = nr self.nr = nr
def __repr__(self):
return f'<{type(self).__name__} {self.street} {self.nr}>'

View File

@ -19,9 +19,9 @@ class PerformanceTestCase(unittest.TestCase):
i += 1 i += 1
def test_range(self): def test_range(self):
default = timeit.timeit(lambda: list(self.values), number=COUNT) default = timeit.timeit(lambda: list(range(0, VALUES)), number=COUNT)
enumerable = timeit.timeit(lambda: Enumerable(int, self.values), number=COUNT) iterable = timeit.timeit(lambda: Iterable.range(0, VALUES), number=COUNT)
iterable = timeit.timeit(lambda: Iterable(int, self.values), number=COUNT) enumerable = timeit.timeit(lambda: Enumerable.range(0, VALUES), number=COUNT)
print('Range') print('Range')
print(f'd: {default}s') print(f'd: {default}s')
@ -32,9 +32,9 @@ class PerformanceTestCase(unittest.TestCase):
self.assertLess(default, iterable) self.assertLess(default, iterable)
def test_where_single(self): def test_where_single(self):
default = timeit.timeit(lambda: [x for x in list(self.values) if x == 50], number=COUNT) default = timeit.timeit(lambda: [x for x in list(range(0, VALUES)) if x == 50], number=COUNT)
iterable = timeit.timeit(lambda: Iterable(int, self.values).where(lambda x: x == 50).single(), number=COUNT) iterable = timeit.timeit(lambda: Iterable.range(0, VALUES).where(lambda x: x == 50).single(), number=COUNT)
enumerable = timeit.timeit(lambda: Enumerable(int, self.values).where(lambda x: x == 50).single(), number=COUNT) enumerable = timeit.timeit(lambda: Enumerable.range(0, VALUES).where(lambda x: x == 50).single(), number=COUNT)
print('Where single') print('Where single')
print(f'd: {default}s') print(f'd: {default}s')
@ -55,7 +55,7 @@ class PerformanceTestCase(unittest.TestCase):
for i in range(VALUES): for i in range(VALUES):
values.append(TestModel(i, TestModel(i + 1))) values.append(TestModel(i, TestModel(i + 1)))
default = timeit.timeit(lambda: [x for x in list(values) if x.tm.value == 50], number=COUNT) default = timeit.timeit(lambda: [x for x in values if x.tm.value == 50], number=COUNT)
iterable = timeit.timeit(lambda: Iterable(TestModel, values).where(lambda x: x.tm.value == 50).single(), number=COUNT) iterable = timeit.timeit(lambda: Iterable(TestModel, values).where(lambda x: x.tm.value == 50).single(), number=COUNT)
enumerable = timeit.timeit(lambda: Enumerable(TestModel, values).where(lambda x: x.tm.value == 50).single(), number=COUNT) enumerable = timeit.timeit(lambda: Enumerable(TestModel, values).where(lambda x: x.tm.value == 50).single(), number=COUNT)

View File

@ -25,4 +25,4 @@ class QueryTestSuite(unittest.TestSuite):
if __name__ == "__main__": if __name__ == "__main__":
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()
runner.run(QueryTestSuite()) runner.run(QueryTestSuite())

View File

@ -1,5 +1,6 @@
import os import os
import subprocess import subprocess
import sys
from unittests_cli.constants import CLI_PATH from unittests_cli.constants import CLI_PATH
@ -23,7 +24,7 @@ class CLICommands:
@staticmethod @staticmethod
def _run_with_output(cmd: str, *args) -> str: def _run_with_output(cmd: str, *args) -> str:
env_vars = os.environ env_vars = os.environ
# env_vars['CPL_IS_UNITTEST'] = 'NO' env_vars['CPL_IS_UNITTEST'] = 'NO'
command = ['python', CLI_PATH, cmd] command = ['python', CLI_PATH, cmd]
for arg in args: for arg in args:

View File

@ -1,11 +1,7 @@
import os
import shutil
import traceback
import unittest import unittest
from typing import Optional from typing import Optional
from unittest import TestResult from unittest import TestResult
from unittests_cli.constants import PLAYGROUND_PATH
from unittests_translation.translation_test_case import TranslationTestCase from unittests_translation.translation_test_case import TranslationTestCase
@ -25,27 +21,5 @@ class TranslationTestSuite(unittest.TestSuite):
for test in active_tests: for test in active_tests:
self.addTests(loader.loadTestsFromTestCase(test)) self.addTests(loader.loadTestsFromTestCase(test))
def _setup(self):
try:
if os.path.exists(PLAYGROUND_PATH):
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH)))
os.makedirs(PLAYGROUND_PATH)
os.chdir(PLAYGROUND_PATH)
except Exception as e:
print(f'Setup of {__name__} failed: {traceback.format_exc()}')
def _cleanup(self):
try:
if self._result is not None and (len(self._result.errors) > 0 or len(self._result.failures) > 0):
return
if os.path.exists(PLAYGROUND_PATH):
shutil.rmtree(os.path.abspath(os.path.join(PLAYGROUND_PATH)))
except Exception as e:
print(f'Cleanup of {__name__} failed: {traceback.format_exc()}')
def run(self, *args): def run(self, *args):
self._setup()
self._result = super().run(*args) self._result = super().run(*args)
self._cleanup()