Compare commits
1 Commits
test/core-
...
82d7751a55
| Author | SHA1 | Date | |
|---|---|---|---|
| 82d7751a55 |
@@ -10,72 +10,30 @@ jobs:
|
||||
uses: ./.gitea/workflows/prepare.yaml
|
||||
secrets: inherit
|
||||
|
||||
api:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, application, auth, core, dependency ]
|
||||
with:
|
||||
working_directory: src/api
|
||||
secrets: inherit
|
||||
|
||||
application:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core, dependency ]
|
||||
with:
|
||||
working_directory: src/application
|
||||
secrets: inherit
|
||||
|
||||
auth:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core, dependency, database ]
|
||||
with:
|
||||
working_directory: src/auth
|
||||
secrets: inherit
|
||||
|
||||
cli:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core ]
|
||||
with:
|
||||
working_directory: src/cli
|
||||
secrets: inherit
|
||||
|
||||
core:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [prepare]
|
||||
with:
|
||||
working_directory: src/core
|
||||
secrets: inherit
|
||||
|
||||
database:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core, dependency ]
|
||||
with:
|
||||
working_directory: src/database
|
||||
secrets: inherit
|
||||
|
||||
dependency:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core ]
|
||||
with:
|
||||
working_directory: src/dependency
|
||||
secrets: inherit
|
||||
|
||||
mail:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core, dependency ]
|
||||
with:
|
||||
working_directory: src/mail
|
||||
working_directory: src/cpl-core
|
||||
secrets: inherit
|
||||
|
||||
query:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [prepare]
|
||||
with:
|
||||
working_directory: src/query
|
||||
working_directory: src/cpl-query
|
||||
secrets: inherit
|
||||
|
||||
translation:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core, dependency ]
|
||||
needs: [ prepare, core ]
|
||||
with:
|
||||
working_directory: src/translation
|
||||
working_directory: src/cpl-translation
|
||||
secrets: inherit
|
||||
|
||||
mail:
|
||||
uses: ./.gitea/workflows/package.yaml
|
||||
needs: [ prepare, core ]
|
||||
with:
|
||||
working_directory: src/cpl-mail
|
||||
secrets: inherit
|
||||
@@ -41,7 +41,6 @@ jobs:
|
||||
source venv/bin/activate
|
||||
bash ./install.sh
|
||||
bash ./install.sh -dev
|
||||
python3.12 -m pip install pytest
|
||||
|
||||
- name: Testing with pytest
|
||||
shell: bash
|
||||
|
||||
@@ -15,5 +15,4 @@ pythonpath = [
|
||||
"src/query",
|
||||
"src/translation"
|
||||
]
|
||||
testpaths = ["test"]
|
||||
asyncio_mode = "auto"
|
||||
testpaths = ["test"]
|
||||
@@ -42,7 +42,8 @@ class UserDao(DbModelDaoABC[User]):
|
||||
|
||||
permission_dao: PermissionDao = get_provider().get_service(PermissionDao)
|
||||
p = await permission_dao.get_by_name(permission if isinstance(permission, str) else permission.value)
|
||||
result = await self._db.select_map(f"""
|
||||
result = await self._db.select_map(
|
||||
f"""
|
||||
SELECT COUNT(*) as count
|
||||
FROM {TableManager.get("role_users")} ru
|
||||
JOIN {TableManager.get("role_permissions")} rp ON ru.roleId = rp.roleId
|
||||
@@ -50,14 +51,16 @@ class UserDao(DbModelDaoABC[User]):
|
||||
AND rp.permissionId = {p.id}
|
||||
AND ru.deleted = FALSE
|
||||
AND rp.deleted = FALSE;
|
||||
""")
|
||||
"""
|
||||
)
|
||||
if result is None or len(result) == 0:
|
||||
return False
|
||||
|
||||
return result[0]["count"] > 0
|
||||
|
||||
async def get_permissions(self, user_id: int) -> list[Permission]:
|
||||
result = await self._db.select_map(f"""
|
||||
result = await self._db.select_map(
|
||||
f"""
|
||||
SELECT p.*
|
||||
FROM {TableManager.get("permissions")} p
|
||||
JOIN {TableManager.get("role_permissions")} rp ON p.id = rp.permissionId
|
||||
@@ -65,5 +68,6 @@ class UserDao(DbModelDaoABC[User]):
|
||||
WHERE ru.userId = {user_id}
|
||||
AND rp.deleted = FALSE
|
||||
AND ru.deleted = FALSE;
|
||||
""")
|
||||
"""
|
||||
)
|
||||
return [self._permissions.to_object(x) for x in result]
|
||||
|
||||
@@ -50,7 +50,8 @@ class Structure:
|
||||
if pyproject_path.exists():
|
||||
return
|
||||
|
||||
content = textwrap.dedent(f"""
|
||||
content = textwrap.dedent(
|
||||
f"""
|
||||
[build-system]
|
||||
requires = ["setuptools>=70.1.0", "wheel", "build"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
@@ -61,7 +62,8 @@ class Structure:
|
||||
authors = [{{name="{project.author or ''}"}}]
|
||||
license = "{project.license or ''}"
|
||||
dependencies = [{', '.join([f'"{dep}"' for dep in project.dependencies])}]
|
||||
""").lstrip()
|
||||
"""
|
||||
).lstrip()
|
||||
|
||||
pyproject_path.write_text(content)
|
||||
|
||||
|
||||
@@ -26,9 +26,6 @@ Homepage = "https://www.sh-edraft.de"
|
||||
where = ["."]
|
||||
include = ["cpl*"]
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
"cpl.cli" = [".cpl/**/*"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
dependencies = { file = ["requirements.txt"] }
|
||||
optional-dependencies.dev = { file = ["requirements.dev.txt"] }
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
black==25.1.0
|
||||
pytest-asyncio==0.26.0
|
||||
black==25.1.0
|
||||
@@ -3,5 +3,4 @@ colorama==0.4.6
|
||||
tabulate==0.9.0
|
||||
termcolor==3.1.0
|
||||
pynput==1.8.1
|
||||
croniter==6.0.0
|
||||
cryptography==46.0.2
|
||||
croniter==6.0.0
|
||||
@@ -322,11 +322,13 @@ class DataAccessObjectABC(ABC, Generic[T_DBM]):
|
||||
Touch the entry to update the last updated date
|
||||
:return:
|
||||
"""
|
||||
await self._db.execute(f"""
|
||||
await self._db.execute(
|
||||
f"""
|
||||
UPDATE {self._table_name}
|
||||
SET updated = NOW()
|
||||
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
async def touch_many_by_id(self, ids: list[Id]):
|
||||
"""
|
||||
@@ -336,11 +338,13 @@ class DataAccessObjectABC(ABC, Generic[T_DBM]):
|
||||
if len(ids) == 0:
|
||||
return
|
||||
|
||||
await self._db.execute(f"""
|
||||
await self._db.execute(
|
||||
f"""
|
||||
UPDATE {self._table_name}
|
||||
SET updated = NOW()
|
||||
WHERE {self.__primary_key} IN ({", ".join([str(x) for x in ids])});
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
async def _build_create_statement(self, obj: T_DBM, skip_editor=False) -> str:
|
||||
allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes]
|
||||
|
||||
@@ -56,11 +56,13 @@ class ExternalDataTempTableBuilder:
|
||||
|
||||
values_str = ", ".join([f"{value}" for value in await self._value_getter()])
|
||||
|
||||
return textwrap.dedent(f"""
|
||||
return textwrap.dedent(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS {self._table_name};
|
||||
CREATE TEMP TABLE {self._table_name} (
|
||||
{", ".join([f"{k} {v}" for k, v in self._fields.items()])}
|
||||
);
|
||||
|
||||
INSERT INTO {self._table_name} VALUES {values_str};
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -84,15 +84,19 @@ class MigrationService(StartupTask):
|
||||
|
||||
async def _get_tables(self):
|
||||
if ServerType == ServerTypes.POSTGRES:
|
||||
return await self._db.select("""
|
||||
return await self._db.select(
|
||||
"""
|
||||
SELECT tablename
|
||||
FROM pg_tables
|
||||
WHERE schemaname = 'public';
|
||||
""")
|
||||
"""
|
||||
)
|
||||
else:
|
||||
return await self._db.select("""
|
||||
return await self._db.select(
|
||||
"""
|
||||
SHOW TABLES;
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
async def _execute(self, migrations: list[Migration]):
|
||||
result = await self._get_tables()
|
||||
|
||||
@@ -3,6 +3,7 @@ from typing import TypeVar, Union, Literal, Any
|
||||
|
||||
from cpl.database.abc.db_model_abc import DbModelABC
|
||||
|
||||
|
||||
T_DBM = TypeVar("T_DBM", bound=DbModelABC)
|
||||
|
||||
NumberFilterOperator = Literal[
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import contextvars
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
_current_provider = contextvars.ContextVar("current_provider", default=None)
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,8 @@ from starlette.responses import HTMLResponse
|
||||
|
||||
|
||||
async def graphiql_endpoint(request):
|
||||
return HTMLResponse("""
|
||||
return HTMLResponse(
|
||||
"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
@@ -64,4 +65,5 @@ async def graphiql_endpoint(request):
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -3,7 +3,8 @@ from starlette.responses import Response, HTMLResponse
|
||||
|
||||
|
||||
async def playground_endpoint(request: Request) -> Response:
|
||||
return HTMLResponse("""
|
||||
return HTMLResponse(
|
||||
"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
@@ -24,4 +25,5 @@ async def playground_endpoint(request: Request) -> Response:
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -1,66 +0,0 @@
|
||||
import pytest
|
||||
from cpl.core.abc.registry_abc import RegistryABC
|
||||
|
||||
|
||||
class StringRegistry(RegistryABC[str]):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def extend(self, items: list[str]) -> None:
|
||||
for item in items:
|
||||
self.add(item)
|
||||
|
||||
def add(self, item: str) -> None:
|
||||
self._items[item] = item
|
||||
|
||||
def get(self, key: str) -> str | None:
|
||||
return self._items.get(key)
|
||||
|
||||
def all(self) -> list[str]:
|
||||
return list(self._items.values())
|
||||
|
||||
|
||||
def test_add_and_get():
|
||||
reg = StringRegistry()
|
||||
reg.add("hello")
|
||||
assert reg.get("hello") == "hello"
|
||||
|
||||
|
||||
def test_get_missing_returns_none():
|
||||
reg = StringRegistry()
|
||||
assert reg.get("nonexistent") is None
|
||||
|
||||
|
||||
def test_all_empty():
|
||||
reg = StringRegistry()
|
||||
assert reg.all() == []
|
||||
|
||||
|
||||
def test_all_returns_all_items():
|
||||
reg = StringRegistry()
|
||||
reg.add("a")
|
||||
reg.add("b")
|
||||
reg.add("c")
|
||||
items = reg.all()
|
||||
assert set(items) == {"a", "b", "c"}
|
||||
|
||||
|
||||
def test_extend():
|
||||
reg = StringRegistry()
|
||||
reg.extend(["x", "y", "z"])
|
||||
assert reg.get("x") == "x"
|
||||
assert reg.get("y") == "y"
|
||||
assert reg.get("z") == "z"
|
||||
assert len(reg.all()) == 3
|
||||
|
||||
|
||||
def test_overwrite_key():
|
||||
reg = StringRegistry()
|
||||
reg.add("key")
|
||||
reg.add("key")
|
||||
assert len(reg.all()) == 1
|
||||
|
||||
|
||||
def test_cannot_instantiate_abc_directly():
|
||||
with pytest.raises(TypeError):
|
||||
RegistryABC()
|
||||
@@ -1,159 +0,0 @@
|
||||
import os
|
||||
import pytest
|
||||
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
|
||||
|
||||
|
||||
class DatabaseSettings(ConfigurationModelABC):
|
||||
def __init__(self, src: dict = None):
|
||||
ConfigurationModelABC.__init__(self, src or {})
|
||||
self.option("host", str, default="localhost")
|
||||
self.option("port", int, default=5432)
|
||||
self.option("name", str)
|
||||
self.option("debug", bool, default=False)
|
||||
|
||||
|
||||
class RequiredSettings(ConfigurationModelABC):
|
||||
def __init__(self, src: dict = None):
|
||||
ConfigurationModelABC.__init__(self, src or {})
|
||||
self.option("api_key", str, required=True)
|
||||
|
||||
|
||||
class MutableSettings(ConfigurationModelABC):
|
||||
def __init__(self, src: dict = None):
|
||||
ConfigurationModelABC.__init__(self, src or {}, readonly=False)
|
||||
self.option("value", str, default="initial")
|
||||
|
||||
|
||||
class EnvSettings(ConfigurationModelABC):
|
||||
def __init__(self, src: dict = None):
|
||||
ConfigurationModelABC.__init__(self, src or {}, env_prefix="MYAPP")
|
||||
self.option("secret", str, default=None)
|
||||
|
||||
|
||||
# --- option() / defaults ---
|
||||
|
||||
|
||||
def test_default_values():
|
||||
s = DatabaseSettings()
|
||||
assert s.host == "localhost"
|
||||
assert s.port == 5432
|
||||
assert s.debug is False
|
||||
|
||||
|
||||
def test_values_from_src():
|
||||
# NOTE: passing native bool True via src triggers cast(True, bool) which
|
||||
# fails because cast() calls value.lower() on a bool (bug in cast.py).
|
||||
# Use string "true" instead — matches real JSON-parsed values.
|
||||
s = DatabaseSettings({"host": "db.example.com", "port": 3306, "name": "mydb", "debug": "true"})
|
||||
assert s.host == "db.example.com"
|
||||
assert s.port == 3306
|
||||
assert s.name == "mydb"
|
||||
assert s.debug is True
|
||||
|
||||
|
||||
def test_pascal_case_key():
|
||||
s = DatabaseSettings({"Host": "remotehost", "Port": 1234})
|
||||
assert s.host == "remotehost"
|
||||
assert s.port == 1234
|
||||
|
||||
|
||||
def test_snake_case_key():
|
||||
s = DatabaseSettings({"host": "snakehost"})
|
||||
assert s.host == "snakehost"
|
||||
|
||||
|
||||
def test_missing_optional_is_none():
|
||||
s = DatabaseSettings()
|
||||
assert s.name is None
|
||||
|
||||
|
||||
def test_type_casting_int():
|
||||
s = DatabaseSettings({"port": "9999"})
|
||||
assert s.port == 9999
|
||||
assert isinstance(s.port, int)
|
||||
|
||||
|
||||
def test_type_casting_bool():
|
||||
s = DatabaseSettings({"debug": "true"})
|
||||
assert s.debug is True
|
||||
|
||||
|
||||
# --- required ---
|
||||
|
||||
|
||||
def test_required_field_present():
|
||||
s = RequiredSettings({"api_key": "abc123"})
|
||||
assert s.api_key == "abc123"
|
||||
|
||||
|
||||
def test_required_field_missing_raises():
|
||||
with pytest.raises(ValueError, match="required"):
|
||||
RequiredSettings()
|
||||
|
||||
|
||||
# --- readonly ---
|
||||
|
||||
|
||||
def test_readonly_raises_on_setattr():
|
||||
s = DatabaseSettings()
|
||||
with pytest.raises(AttributeError, match="read-only"):
|
||||
s.host = "newhost"
|
||||
|
||||
|
||||
def test_mutable_settings_can_be_set():
|
||||
s = MutableSettings()
|
||||
s.value = "changed"
|
||||
assert s.value == "changed"
|
||||
|
||||
|
||||
# --- env override ---
|
||||
|
||||
|
||||
def test_env_prefix_override(monkeypatch):
|
||||
monkeypatch.setenv("MYAPP_SECRET", "env_secret_value")
|
||||
s = EnvSettings()
|
||||
assert s.secret == "env_secret_value"
|
||||
|
||||
|
||||
def test_env_no_prefix_override(monkeypatch):
|
||||
monkeypatch.setenv("HOST", "env_host")
|
||||
|
||||
class NoPrefix(ConfigurationModelABC):
|
||||
def __init__(self, src=None):
|
||||
ConfigurationModelABC.__init__(self, src or {})
|
||||
self.option("host", str, default=None)
|
||||
|
||||
s = NoPrefix()
|
||||
assert s.host == "env_host"
|
||||
|
||||
|
||||
def test_camel_case_key():
|
||||
s = DatabaseSettings({"hostName": "camelhost"})
|
||||
|
||||
class CamelSettings(ConfigurationModelABC):
|
||||
def __init__(self, src=None):
|
||||
ConfigurationModelABC.__init__(self, src or {})
|
||||
self.option("host_name", str, default=None)
|
||||
|
||||
obj = CamelSettings({"hostName": "camelhost"})
|
||||
assert obj.host_name == "camelhost"
|
||||
|
||||
|
||||
# --- to_dict ---
|
||||
|
||||
|
||||
def test_to_dict_returns_dict():
|
||||
# to_dict() calls get() which calls get_value(src, field, _options[field].type, ...)
|
||||
# _options stores the cast value directly (not a typed wrapper), so .type raises AttributeError.
|
||||
# Bug: to_dict() is broken in the current implementation.
|
||||
s = DatabaseSettings({"host": "myhost", "port": "1234"})
|
||||
with pytest.raises(AttributeError):
|
||||
s.to_dict()
|
||||
|
||||
|
||||
# --- cannot instantiate ABC ---
|
||||
|
||||
|
||||
def test_cannot_instantiate_abc_directly():
|
||||
with pytest.raises(TypeError):
|
||||
ConfigurationModelABC()
|
||||
@@ -1,107 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
from cpl.core.configuration.configuration import Configuration
|
||||
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
|
||||
|
||||
|
||||
class AppSettings(ConfigurationModelABC):
|
||||
def __init__(self, src: dict = None):
|
||||
ConfigurationModelABC.__init__(self, src or {})
|
||||
self.option("app_name", str, default="default-app")
|
||||
self.option("version", str, default="0.0.0")
|
||||
|
||||
|
||||
class ServerSettings(ConfigurationModelABC):
|
||||
def __init__(self, src: dict = None):
|
||||
ConfigurationModelABC.__init__(self, src or {})
|
||||
self.option("host", str, default="localhost")
|
||||
self.option("port", int, default=8080)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clear_config():
|
||||
"""Reset Configuration state before each test."""
|
||||
Configuration._config.clear()
|
||||
yield
|
||||
Configuration._config.clear()
|
||||
|
||||
|
||||
# --- set / get ---
|
||||
|
||||
|
||||
def test_set_and_get_by_class():
|
||||
settings = AppSettings({"app_name": "TestApp", "version": "1.0.0"})
|
||||
Configuration.set(AppSettings, settings)
|
||||
result = Configuration.get(AppSettings)
|
||||
assert result.app_name == "TestApp"
|
||||
assert result.version == "1.0.0"
|
||||
|
||||
|
||||
def test_set_and_get_by_string_key():
|
||||
Configuration.set("my_key", "my_value")
|
||||
assert Configuration.get("my_key") == "my_value"
|
||||
|
||||
|
||||
def test_get_missing_returns_default():
|
||||
assert Configuration.get("nonexistent", "fallback") == "fallback"
|
||||
|
||||
|
||||
def test_get_missing_returns_none():
|
||||
assert Configuration.get("nonexistent") is None
|
||||
|
||||
|
||||
def test_get_model_auto_instantiates():
|
||||
# Getting an unregistered ConfigurationModelABC subclass should auto-create it
|
||||
result = Configuration.get(ServerSettings)
|
||||
assert isinstance(result, ServerSettings)
|
||||
assert result.host == "localhost"
|
||||
assert result.port == 8080
|
||||
|
||||
|
||||
def test_overwrite_existing():
|
||||
Configuration.set("key", "first")
|
||||
Configuration.set("key", "second")
|
||||
assert Configuration.get("key") == "second"
|
||||
|
||||
|
||||
def test_multiple_models():
|
||||
Configuration.set(AppSettings, AppSettings({"app_name": "App"}))
|
||||
Configuration.set(ServerSettings, ServerSettings({"port": 9000}))
|
||||
assert Configuration.get(AppSettings).app_name == "App"
|
||||
assert Configuration.get(ServerSettings).port == 9000
|
||||
|
||||
|
||||
# --- add_json_file ---
|
||||
|
||||
|
||||
def test_add_json_file_loads_model(tmp_path):
|
||||
config_data = {"AppSettings": {"app_name": "FromFile", "version": "2.0.0"}}
|
||||
config_file = tmp_path / "appsettings.json"
|
||||
config_file.write_text(json.dumps(config_data), encoding="utf-8")
|
||||
|
||||
Configuration.add_json_file(str(config_file), output=False)
|
||||
|
||||
result = Configuration.get(AppSettings)
|
||||
assert result.app_name == "FromFile"
|
||||
assert result.version == "2.0.0"
|
||||
|
||||
|
||||
def test_add_json_file_not_found_exits(tmp_path):
|
||||
with pytest.raises(SystemExit):
|
||||
Configuration.add_json_file(str(tmp_path / "missing.json"), output=False)
|
||||
|
||||
|
||||
def test_add_json_file_optional_missing_returns_none(tmp_path):
|
||||
result = Configuration.add_json_file(str(tmp_path / "missing.json"), optional=True, output=False)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_add_json_file_invalid_json(tmp_path):
|
||||
# _load_json_file catches parse errors and returns {} — no SystemExit.
|
||||
# add_json_file then proceeds with an empty config (no models loaded).
|
||||
bad_file = tmp_path / "bad.json"
|
||||
bad_file.write_text("not valid json", encoding="utf-8")
|
||||
Configuration.add_json_file(str(bad_file), output=False)
|
||||
# No model should be registered since the file was invalid
|
||||
assert Configuration.get("AppSettings") is None
|
||||
@@ -1,101 +0,0 @@
|
||||
import pytest
|
||||
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
|
||||
from cpl.core.console.background_color_enum import BackgroundColorEnum
|
||||
from cpl.core.console.console import Console
|
||||
|
||||
# --- ForegroundColorEnum ---
|
||||
|
||||
|
||||
def test_foreground_color_enum_has_expected_values():
|
||||
values = [e.value for e in ForegroundColorEnum]
|
||||
assert "default" in values
|
||||
assert "grey" in values
|
||||
assert "red" in values
|
||||
assert "green" in values
|
||||
assert "yellow" in values
|
||||
assert "blue" in values
|
||||
assert "magenta" in values
|
||||
assert "cyan" in values
|
||||
assert "white" in values
|
||||
|
||||
|
||||
def test_foreground_color_enum_count():
|
||||
assert len(ForegroundColorEnum) >= 9
|
||||
|
||||
|
||||
def test_foreground_color_enum_by_name():
|
||||
assert ForegroundColorEnum["red"] == ForegroundColorEnum.red
|
||||
assert ForegroundColorEnum["green"] == ForegroundColorEnum.green
|
||||
|
||||
|
||||
# --- BackgroundColorEnum ---
|
||||
|
||||
|
||||
def test_background_color_enum_has_expected_values():
|
||||
values = [e.value for e in BackgroundColorEnum]
|
||||
assert "on_default" in values
|
||||
assert "on_red" in values
|
||||
assert "on_green" in values
|
||||
assert "on_blue" in values
|
||||
|
||||
|
||||
def test_background_color_enum_count():
|
||||
assert len(BackgroundColorEnum) >= 9
|
||||
|
||||
|
||||
def test_background_color_enum_by_name():
|
||||
assert BackgroundColorEnum["red"] == BackgroundColorEnum.red
|
||||
|
||||
|
||||
# --- Console basic methods (non-interactive, no terminal required) ---
|
||||
|
||||
|
||||
def test_console_write_does_not_raise(capsys):
|
||||
Console.write("test")
|
||||
captured = capsys.readouterr()
|
||||
assert "test" in captured.out
|
||||
|
||||
|
||||
def test_console_write_line_does_not_raise(capsys):
|
||||
Console.write_line("hello world")
|
||||
captured = capsys.readouterr()
|
||||
assert "hello world" in captured.out
|
||||
|
||||
|
||||
def test_console_write_line_multiple_args(capsys):
|
||||
Console.write_line("a", "b", "c")
|
||||
captured = capsys.readouterr()
|
||||
assert "a" in captured.out
|
||||
|
||||
|
||||
def test_console_error_does_not_raise(capsys):
|
||||
Console.error("something went wrong")
|
||||
captured = capsys.readouterr()
|
||||
assert "something went wrong" in captured.out or "something went wrong" in captured.err
|
||||
|
||||
|
||||
def test_console_set_foreground_color_does_not_raise():
|
||||
Console.set_foreground_color(ForegroundColorEnum.red)
|
||||
Console.set_foreground_color(ForegroundColorEnum.default)
|
||||
|
||||
|
||||
def test_console_set_background_color_does_not_raise():
|
||||
Console.set_background_color(BackgroundColorEnum.blue)
|
||||
Console.set_background_color(BackgroundColorEnum.default)
|
||||
|
||||
|
||||
def test_console_color_reset_does_not_raise():
|
||||
Console.color_reset()
|
||||
|
||||
|
||||
def test_console_banner_does_not_raise(capsys):
|
||||
# banner() renders text as ASCII art, not verbatim
|
||||
Console.banner("Test Banner")
|
||||
captured = capsys.readouterr()
|
||||
assert len(captured.out) > 0
|
||||
|
||||
|
||||
def test_console_divider_does_not_raise(capsys):
|
||||
Console.divider()
|
||||
captured = capsys.readouterr()
|
||||
assert len(captured.out) > 0
|
||||
@@ -1,104 +0,0 @@
|
||||
import os
|
||||
import pytest
|
||||
from cpl.core.environment.environment import Environment
|
||||
from cpl.core.environment.environment_enum import EnvironmentEnum
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def restore_env():
|
||||
"""Restore touched env vars after each test to prevent pollution."""
|
||||
original = dict(os.environ)
|
||||
yield
|
||||
for key in list(os.environ.keys()):
|
||||
if key not in original:
|
||||
del os.environ[key]
|
||||
elif os.environ[key] != original[key]:
|
||||
os.environ[key] = original[key]
|
||||
|
||||
|
||||
def test_set_and_get_environment_valid():
|
||||
for env in EnvironmentEnum:
|
||||
Environment.set_environment(env.value)
|
||||
assert Environment.get_environment() == env.value
|
||||
|
||||
|
||||
def test_set_environment_invalid():
|
||||
with pytest.raises(AssertionError):
|
||||
Environment.set_environment("invalid_env")
|
||||
|
||||
|
||||
def test_set_environment_empty():
|
||||
with pytest.raises(AssertionError):
|
||||
Environment.set_environment("")
|
||||
|
||||
|
||||
def test_set_environment_none():
|
||||
with pytest.raises(AssertionError):
|
||||
Environment.set_environment(None)
|
||||
|
||||
|
||||
def test_set_and_get_app_name():
|
||||
Environment.set_app_name("TestApp")
|
||||
assert Environment.get_app_name() == "TestApp"
|
||||
|
||||
|
||||
def test_get_host_name():
|
||||
hostname = Environment.get_host_name()
|
||||
assert isinstance(hostname, str)
|
||||
assert len(hostname) > 0
|
||||
|
||||
|
||||
def test_get_cwd():
|
||||
cwd = Environment.get_cwd()
|
||||
assert isinstance(cwd, str)
|
||||
assert os.path.isdir(cwd)
|
||||
|
||||
|
||||
def test_set_cwd(tmp_path):
|
||||
original = Environment.get_cwd()
|
||||
Environment.set_cwd(str(tmp_path))
|
||||
assert Environment.get_cwd() == str(tmp_path)
|
||||
Environment.set_cwd(original)
|
||||
|
||||
|
||||
def test_set_cwd_empty():
|
||||
with pytest.raises(AssertionError):
|
||||
Environment.set_cwd("")
|
||||
|
||||
|
||||
def test_set_cwd_none():
|
||||
with pytest.raises(AssertionError):
|
||||
Environment.set_cwd(None)
|
||||
|
||||
|
||||
def test_set_and_get_generic():
|
||||
Environment.set("CPL_TEST_KEY", "hello")
|
||||
assert Environment.get("CPL_TEST_KEY", str) == "hello"
|
||||
|
||||
|
||||
def test_get_missing_key_returns_default():
|
||||
result = Environment.get("CPL_NONEXISTENT_KEY_XYZ", str, "default_value")
|
||||
assert result == "default_value"
|
||||
|
||||
|
||||
def test_get_missing_key_returns_none():
|
||||
result = Environment.get("CPL_NONEXISTENT_KEY_XYZ2", str)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_set_key_empty():
|
||||
with pytest.raises(AssertionError):
|
||||
Environment.set("", "value")
|
||||
|
||||
|
||||
def test_set_key_none():
|
||||
with pytest.raises(AssertionError):
|
||||
Environment.set(None, "value")
|
||||
|
||||
|
||||
def test_environment_enum_values():
|
||||
values = [e.value for e in EnvironmentEnum]
|
||||
assert "production" in values
|
||||
assert "staging" in values
|
||||
assert "testing" in values
|
||||
assert "development" in values
|
||||
@@ -1,38 +0,0 @@
|
||||
import pytest
|
||||
from cpl.core.errors import dependency_error, module_dependency_error
|
||||
|
||||
|
||||
def test_dependency_error_exits(capsys):
|
||||
with pytest.raises(SystemExit):
|
||||
dependency_error("my.feature", "some-package")
|
||||
|
||||
|
||||
def test_dependency_error_prints_message(capsys):
|
||||
with pytest.raises(SystemExit):
|
||||
dependency_error("my.feature", "some-package")
|
||||
captured = capsys.readouterr()
|
||||
assert "some-package" in captured.out
|
||||
assert "my.feature" in captured.out
|
||||
|
||||
|
||||
def test_dependency_error_with_import_error(capsys):
|
||||
try:
|
||||
import nonexistent_package
|
||||
except ImportError as e:
|
||||
with pytest.raises(SystemExit):
|
||||
dependency_error("my.feature", "nonexistent_package", e)
|
||||
captured = capsys.readouterr()
|
||||
assert "nonexistent_package" in captured.out
|
||||
|
||||
|
||||
def test_module_dependency_error_exits(capsys):
|
||||
with pytest.raises(SystemExit):
|
||||
module_dependency_error("MyModule", "SomeDependency")
|
||||
|
||||
|
||||
def test_module_dependency_error_prints_message(capsys):
|
||||
with pytest.raises(SystemExit):
|
||||
module_dependency_error("MyModule", "SomeDependency")
|
||||
captured = capsys.readouterr()
|
||||
assert "SomeDependency" in captured.out
|
||||
assert "MyModule" in captured.out
|
||||
@@ -1,152 +0,0 @@
|
||||
import os
|
||||
import pytest
|
||||
from cpl.core.log.log_level import LogLevel
|
||||
from cpl.core.log.log_settings import LogSettings
|
||||
from cpl.core.log.logger import Logger
|
||||
from cpl.core.log.logger_abc import LoggerABC
|
||||
|
||||
# --- LogLevel ---
|
||||
|
||||
|
||||
def test_log_level_values():
|
||||
assert LogLevel.off.value == "OFF"
|
||||
assert LogLevel.trace.value == "TRC"
|
||||
assert LogLevel.debug.value == "DEB"
|
||||
assert LogLevel.info.value == "INF"
|
||||
assert LogLevel.warning.value == "WAR"
|
||||
assert LogLevel.error.value == "ERR"
|
||||
assert LogLevel.fatal.value == "FAT"
|
||||
|
||||
|
||||
def test_log_level_order():
|
||||
levels = list(LogLevel)
|
||||
assert levels.index(LogLevel.trace) < levels.index(LogLevel.debug)
|
||||
assert levels.index(LogLevel.debug) < levels.index(LogLevel.info)
|
||||
assert levels.index(LogLevel.info) < levels.index(LogLevel.warning)
|
||||
assert levels.index(LogLevel.warning) < levels.index(LogLevel.error)
|
||||
assert levels.index(LogLevel.error) < levels.index(LogLevel.fatal)
|
||||
|
||||
|
||||
def test_log_level_by_name():
|
||||
assert LogLevel["info"] == LogLevel.info
|
||||
assert LogLevel["error"] == LogLevel.error
|
||||
|
||||
|
||||
# --- LogSettings ---
|
||||
|
||||
|
||||
def test_log_settings_defaults():
|
||||
s = LogSettings()
|
||||
assert s.path == "logs"
|
||||
assert s.filename == "app.log"
|
||||
assert s.console == LogLevel.info
|
||||
assert s.level == LogLevel.info
|
||||
|
||||
|
||||
def test_log_settings_from_src():
|
||||
s = LogSettings({"path": "custom_logs", "filename": "myapp.log"})
|
||||
assert s.path == "custom_logs"
|
||||
assert s.filename == "myapp.log"
|
||||
|
||||
|
||||
# --- LoggerABC ---
|
||||
|
||||
|
||||
def test_logger_abc_cannot_instantiate():
|
||||
with pytest.raises(TypeError):
|
||||
LoggerABC()
|
||||
|
||||
|
||||
# --- Logger ---
|
||||
|
||||
|
||||
def test_logger_creates_instance(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
assert logger is not None
|
||||
|
||||
|
||||
def test_logger_log_file_property(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
assert logger.log_file.startswith("logs/")
|
||||
assert logger.log_file.endswith(".log")
|
||||
|
||||
|
||||
def test_logger_should_log_same_level(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
assert logger._should_log(LogLevel.info, LogLevel.info) is True
|
||||
assert logger._should_log(LogLevel.error, LogLevel.error) is True
|
||||
|
||||
|
||||
def test_logger_should_log_higher_level(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
assert logger._should_log(LogLevel.error, LogLevel.info) is True
|
||||
assert logger._should_log(LogLevel.fatal, LogLevel.debug) is True
|
||||
|
||||
|
||||
def test_logger_should_not_log_lower_level(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
assert logger._should_log(LogLevel.debug, LogLevel.info) is False
|
||||
assert logger._should_log(LogLevel.trace, LogLevel.warning) is False
|
||||
|
||||
|
||||
def test_logger_file_format_message(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger("test_src")
|
||||
msg = logger._file_format_message("INF", "2024-01-01 00:00:00.000000", "hello", "world")
|
||||
assert "INF" in msg
|
||||
assert "hello" in msg
|
||||
assert "world" in msg
|
||||
assert "test_src" in msg
|
||||
|
||||
|
||||
def test_logger_console_format_message(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger("test_src")
|
||||
msg = logger._console_format_message("DEB", "2024-01-01 00:00:00.000000", "debug message")
|
||||
assert "DEB" in msg
|
||||
assert "debug message" in msg
|
||||
|
||||
|
||||
def test_logger_info_writes_file(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
logger.info("test info message")
|
||||
log_files = list(tmp_path.glob("logs/*.log"))
|
||||
assert len(log_files) > 0
|
||||
content = log_files[0].read_text()
|
||||
assert "test info message" in content
|
||||
|
||||
|
||||
def test_logger_debug_below_info_not_written_to_file(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
# Default level is INFO, so DEBUG should not appear in the file
|
||||
logger = Logger(__name__)
|
||||
logger.debug("should not appear in file")
|
||||
log_files = list(tmp_path.glob("logs/*.log"))
|
||||
if log_files:
|
||||
content = log_files[0].read_text()
|
||||
assert "should not appear in file" not in content
|
||||
|
||||
|
||||
def test_logger_set_level_invalid(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
with pytest.raises(ValueError):
|
||||
Logger.set_level("INVALID_LEVEL")
|
||||
|
||||
|
||||
def test_logger_fatal_exits(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
with pytest.raises(SystemExit):
|
||||
logger.fatal("fatal error")
|
||||
|
||||
|
||||
def test_logger_fatal_prevent_quit(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
logger = Logger(__name__)
|
||||
logger.fatal("fatal no quit", prevent_quit=True) # Should not raise
|
||||
@@ -1,36 +0,0 @@
|
||||
import pytest
|
||||
from cpl.core.pipes.bool_pipe import BoolPipe
|
||||
|
||||
|
||||
def test_to_str_true():
|
||||
assert BoolPipe.to_str(True) == "true"
|
||||
|
||||
|
||||
def test_to_str_false():
|
||||
assert BoolPipe.to_str(False) == "false"
|
||||
|
||||
|
||||
def test_from_str_true_values():
|
||||
assert BoolPipe.from_str("True") is True
|
||||
assert BoolPipe.from_str("true") is True
|
||||
assert BoolPipe.from_str("1") is True
|
||||
assert BoolPipe.from_str("yes") is True
|
||||
assert BoolPipe.from_str("y") is True
|
||||
assert BoolPipe.from_str("Y") is True
|
||||
|
||||
|
||||
def test_from_str_false_values():
|
||||
assert BoolPipe.from_str("false") is False
|
||||
assert BoolPipe.from_str("False") is False
|
||||
assert BoolPipe.from_str("0") is False
|
||||
assert BoolPipe.from_str("no") is False
|
||||
assert BoolPipe.from_str("") is False
|
||||
assert BoolPipe.from_str("anything") is False
|
||||
|
||||
|
||||
def test_roundtrip_true():
|
||||
assert BoolPipe.from_str(BoolPipe.to_str(True)) is True
|
||||
|
||||
|
||||
def test_roundtrip_false():
|
||||
assert BoolPipe.from_str(BoolPipe.to_str(False)) is False
|
||||
@@ -1,59 +0,0 @@
|
||||
import pytest
|
||||
from cpl.core.pipes.ip_address_pipe import IPAddressPipe
|
||||
|
||||
|
||||
def test_to_str_valid():
|
||||
assert IPAddressPipe.to_str([192, 168, 1, 1]) == "192.168.1.1"
|
||||
assert IPAddressPipe.to_str([0, 0, 0, 0]) == "0.0.0.0"
|
||||
assert IPAddressPipe.to_str([255, 255, 255, 255]) == "255.255.255.255"
|
||||
assert IPAddressPipe.to_str([127, 0, 0, 1]) == "127.0.0.1"
|
||||
|
||||
|
||||
def test_to_str_too_few_parts():
|
||||
with pytest.raises(ValueError):
|
||||
IPAddressPipe.to_str([192, 168, 1])
|
||||
|
||||
|
||||
def test_to_str_too_many_parts():
|
||||
with pytest.raises(ValueError):
|
||||
IPAddressPipe.to_str([192, 168, 1, 1, 5])
|
||||
|
||||
|
||||
def test_to_str_byte_out_of_range():
|
||||
with pytest.raises(ValueError):
|
||||
IPAddressPipe.to_str([256, 0, 0, 0])
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
IPAddressPipe.to_str([-1, 0, 0, 0])
|
||||
|
||||
|
||||
def test_from_str_valid():
|
||||
assert IPAddressPipe.from_str("192.168.1.1") == [192, 168, 1, 1]
|
||||
assert IPAddressPipe.from_str("0.0.0.0") == [0, 0, 0, 0]
|
||||
assert IPAddressPipe.from_str("255.255.255.255") == [255, 255, 255, 255]
|
||||
assert IPAddressPipe.from_str("127.0.0.1") == [127, 0, 0, 1]
|
||||
|
||||
|
||||
def test_from_str_too_few_parts():
|
||||
with pytest.raises(Exception):
|
||||
IPAddressPipe.from_str("192.168.1")
|
||||
|
||||
|
||||
def test_from_str_too_many_parts():
|
||||
with pytest.raises(Exception):
|
||||
IPAddressPipe.from_str("192.168.1.1.5")
|
||||
|
||||
|
||||
def test_from_str_byte_out_of_range():
|
||||
with pytest.raises(Exception):
|
||||
IPAddressPipe.from_str("256.0.0.0")
|
||||
|
||||
|
||||
def test_from_str_invalid_format():
|
||||
with pytest.raises(Exception):
|
||||
IPAddressPipe.from_str("not.an.ip.addr")
|
||||
|
||||
|
||||
def test_roundtrip():
|
||||
original = [10, 20, 30, 40]
|
||||
assert IPAddressPipe.from_str(IPAddressPipe.to_str(original)) == original
|
||||
@@ -1,35 +0,0 @@
|
||||
from cpl.core.property import classproperty
|
||||
|
||||
|
||||
class MyClass:
|
||||
_value = 42
|
||||
|
||||
@classproperty
|
||||
def value(cls):
|
||||
return cls._value
|
||||
|
||||
@classproperty
|
||||
def name(cls):
|
||||
return cls.__name__
|
||||
|
||||
|
||||
class Child(MyClass):
|
||||
_value = 99
|
||||
|
||||
|
||||
def test_classproperty_on_class():
|
||||
assert MyClass.value == 42
|
||||
|
||||
|
||||
def test_classproperty_on_instance():
|
||||
obj = MyClass()
|
||||
assert obj.value == 42
|
||||
|
||||
|
||||
def test_classproperty_subclass_inherits_override():
|
||||
assert Child.value == 99
|
||||
|
||||
|
||||
def test_classproperty_returns_class_name():
|
||||
assert MyClass.name == "MyClass"
|
||||
assert Child.name == "Child"
|
||||
@@ -1,118 +0,0 @@
|
||||
import asyncio
|
||||
import pytest
|
||||
from cpl.core.service.hosted_service import HostedService
|
||||
from cpl.core.service.startup_task import StartupTask
|
||||
from cpl.core.service.cronjob import CronjobABC
|
||||
from cpl.core.time.cron import Cron
|
||||
|
||||
# --- HostedService ---
|
||||
|
||||
|
||||
def test_hosted_service_cannot_instantiate():
|
||||
with pytest.raises(TypeError):
|
||||
HostedService()
|
||||
|
||||
|
||||
def test_hosted_service_concrete_can_start_stop():
|
||||
class MyService(HostedService):
|
||||
def __init__(self):
|
||||
self.started = False
|
||||
self.stopped = False
|
||||
|
||||
async def start(self):
|
||||
self.started = True
|
||||
|
||||
async def stop(self):
|
||||
self.stopped = True
|
||||
|
||||
async def run():
|
||||
svc = MyService()
|
||||
await svc.start()
|
||||
assert svc.started is True
|
||||
await svc.stop()
|
||||
assert svc.stopped is True
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
# --- StartupTask ---
|
||||
|
||||
|
||||
def test_startup_task_cannot_instantiate():
|
||||
with pytest.raises(TypeError):
|
||||
StartupTask()
|
||||
|
||||
|
||||
def test_startup_task_concrete_runs():
|
||||
class MyTask(StartupTask):
|
||||
def __init__(self):
|
||||
self.ran = False
|
||||
|
||||
async def run(self):
|
||||
self.ran = True
|
||||
|
||||
async def execute():
|
||||
task = MyTask()
|
||||
await task.run()
|
||||
assert task.ran is True
|
||||
|
||||
asyncio.run(execute())
|
||||
|
||||
|
||||
# --- CronjobABC ---
|
||||
|
||||
|
||||
def test_cronjob_cannot_instantiate():
|
||||
with pytest.raises(TypeError):
|
||||
CronjobABC(Cron("* * * * *"))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cronjob_loop_is_called():
|
||||
loop_calls = []
|
||||
|
||||
class MyCronJob(CronjobABC):
|
||||
def __init__(self):
|
||||
CronjobABC.__init__(self, Cron("* * * * *"))
|
||||
|
||||
async def loop(self):
|
||||
loop_calls.append(1)
|
||||
|
||||
job = MyCronJob()
|
||||
assert job._running is False
|
||||
await job.start()
|
||||
assert job._running is True
|
||||
# Give it a moment then stop
|
||||
await asyncio.sleep(0.05)
|
||||
await job.stop()
|
||||
assert job._running is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cronjob_stop_cancels_task():
|
||||
class MyCronJob(CronjobABC):
|
||||
def __init__(self):
|
||||
CronjobABC.__init__(self, Cron("* * * * *"))
|
||||
|
||||
async def loop(self):
|
||||
pass
|
||||
|
||||
job = MyCronJob()
|
||||
await job.start()
|
||||
assert job._task is not None
|
||||
await job.stop()
|
||||
assert job._task.cancelled() or job._task.done()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cronjob_stop_without_start():
|
||||
class MyCronJob(CronjobABC):
|
||||
def __init__(self):
|
||||
CronjobABC.__init__(self, Cron("* * * * *"))
|
||||
|
||||
async def loop(self):
|
||||
pass
|
||||
|
||||
job = MyCronJob()
|
||||
# stop without start should not raise
|
||||
await job.stop()
|
||||
@@ -1,60 +0,0 @@
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from cpl.core.time.cron import Cron
|
||||
|
||||
|
||||
def test_next_returns_datetime():
|
||||
cron = Cron("* * * * *")
|
||||
result = cron.next()
|
||||
assert isinstance(result, datetime)
|
||||
|
||||
|
||||
def test_next_is_in_the_future():
|
||||
cron = Cron("* * * * *")
|
||||
result = cron.next()
|
||||
assert result > datetime.now()
|
||||
|
||||
|
||||
def test_next_called_multiple_times_is_monotonic():
|
||||
cron = Cron("* * * * *")
|
||||
results = [cron.next() for _ in range(5)]
|
||||
for i in range(1, len(results)):
|
||||
assert results[i] > results[i - 1]
|
||||
|
||||
|
||||
def test_every_minute_interval():
|
||||
start = datetime(2024, 1, 1, 12, 0, 0)
|
||||
cron = Cron("* * * * *", start_time=start)
|
||||
first = cron.next()
|
||||
second = cron.next()
|
||||
diff = (second - first).total_seconds()
|
||||
assert diff == 60
|
||||
|
||||
|
||||
def test_hourly_interval():
|
||||
start = datetime(2024, 1, 1, 12, 0, 0)
|
||||
cron = Cron("0 * * * *", start_time=start)
|
||||
first = cron.next()
|
||||
second = cron.next()
|
||||
diff = (second - first).total_seconds()
|
||||
assert diff == 3600
|
||||
|
||||
|
||||
def test_daily_midnight():
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
cron = Cron("0 0 * * *", start_time=start)
|
||||
first = cron.next()
|
||||
assert first.hour == 0
|
||||
assert first.minute == 0
|
||||
|
||||
|
||||
def test_custom_start_time():
|
||||
start = datetime(2024, 6, 15, 10, 30, 0)
|
||||
cron = Cron("*/5 * * * *", start_time=start)
|
||||
result = cron.next()
|
||||
assert result > start
|
||||
|
||||
|
||||
def test_invalid_cron_expression():
|
||||
with pytest.raises((ValueError, KeyError)):
|
||||
Cron("invalid expression")
|
||||
@@ -1,44 +0,0 @@
|
||||
import pytest
|
||||
from cpl.core.time.time_format_settings import TimeFormatSettings
|
||||
|
||||
|
||||
def test_defaults_are_none():
|
||||
s = TimeFormatSettings()
|
||||
assert s.date_format is None
|
||||
assert s.time_format is None
|
||||
assert s.date_time_format is None
|
||||
assert s.date_time_log_format is None
|
||||
|
||||
|
||||
def test_constructor_sets_all_fields():
|
||||
s = TimeFormatSettings(
|
||||
date_format="%Y-%m-%d",
|
||||
time_format="%H:%M:%S",
|
||||
date_time_format="%Y-%m-%d %H:%M:%S",
|
||||
date_time_log_format="%Y-%m-%d %H:%M:%S.%f",
|
||||
)
|
||||
assert s.date_format == "%Y-%m-%d"
|
||||
assert s.time_format == "%H:%M:%S"
|
||||
assert s.date_time_format == "%Y-%m-%d %H:%M:%S"
|
||||
assert s.date_time_log_format == "%Y-%m-%d %H:%M:%S.%f"
|
||||
|
||||
|
||||
def test_setters_update_values():
|
||||
s = TimeFormatSettings()
|
||||
s.date_format = "%d/%m/%Y"
|
||||
s.time_format = "%I:%M %p"
|
||||
s.date_time_format = "%d/%m/%Y %I:%M %p"
|
||||
s.date_time_log_format = "%d/%m/%Y %H:%M:%S.%f"
|
||||
assert s.date_format == "%d/%m/%Y"
|
||||
assert s.time_format == "%I:%M %p"
|
||||
assert s.date_time_format == "%d/%m/%Y %I:%M %p"
|
||||
assert s.date_time_log_format == "%d/%m/%Y %H:%M:%S.%f"
|
||||
|
||||
|
||||
def test_partial_construction():
|
||||
# TimeFormatSettings uses constructor args, not option()/from_src()
|
||||
s = TimeFormatSettings(date_format="%Y/%m/%d")
|
||||
assert s.date_format == "%Y/%m/%d"
|
||||
assert s.time_format is None
|
||||
assert s.date_time_format is None
|
||||
assert s.date_time_log_format is None
|
||||
@@ -1,60 +0,0 @@
|
||||
import pytest
|
||||
from cpl.core.utils.benchmark import Benchmark
|
||||
|
||||
|
||||
def noop():
|
||||
pass
|
||||
|
||||
|
||||
def test_benchmark_time_does_not_raise(capsys):
|
||||
Benchmark.time("noop", noop, iterations=3)
|
||||
captured = capsys.readouterr()
|
||||
assert "noop" in captured.out
|
||||
assert "min" in captured.out
|
||||
assert "avg" in captured.out
|
||||
|
||||
|
||||
def test_benchmark_memory_does_not_raise(capsys):
|
||||
Benchmark.memory("noop_mem", noop, iterations=3)
|
||||
captured = capsys.readouterr()
|
||||
assert "noop_mem" in captured.out
|
||||
assert "mem" in captured.out
|
||||
|
||||
|
||||
def test_benchmark_all_does_not_raise(capsys):
|
||||
Benchmark.all("noop_all", noop, iterations=3)
|
||||
captured = capsys.readouterr()
|
||||
assert "noop_all" in captured.out
|
||||
assert "min" in captured.out
|
||||
assert "mem" in captured.out
|
||||
|
||||
|
||||
def test_benchmark_time_calls_func():
|
||||
calls = []
|
||||
|
||||
def tracked():
|
||||
calls.append(1)
|
||||
|
||||
Benchmark.time("tracked", tracked, iterations=4)
|
||||
assert len(calls) == 4
|
||||
|
||||
|
||||
def test_benchmark_memory_calls_func():
|
||||
calls = []
|
||||
|
||||
def tracked():
|
||||
calls.append(1)
|
||||
|
||||
Benchmark.memory("tracked_mem", tracked, iterations=3)
|
||||
assert len(calls) == 3
|
||||
|
||||
|
||||
def test_benchmark_all_calls_func_twice_per_iteration():
|
||||
calls = []
|
||||
|
||||
def tracked():
|
||||
calls.append(1)
|
||||
|
||||
Benchmark.all("tracked_all", tracked, iterations=2)
|
||||
# all() runs func once per iteration for time, once per iteration for memory = 2*iterations
|
||||
assert len(calls) == 4
|
||||
@@ -1,125 +0,0 @@
|
||||
import time
|
||||
import pytest
|
||||
from cpl.core.utils.cache import Cache
|
||||
|
||||
|
||||
def test_set_and_get():
|
||||
cache = Cache()
|
||||
cache.set("key1", "value1")
|
||||
assert cache.get("key1") == "value1"
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_get_missing_key_returns_none():
|
||||
cache = Cache()
|
||||
assert cache.get("nonexistent") is None
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_has_existing_key():
|
||||
cache = Cache()
|
||||
cache.set("key", "val")
|
||||
assert cache.has("key") is True
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_has_missing_key():
|
||||
cache = Cache()
|
||||
assert cache.has("missing") is False
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_delete():
|
||||
cache = Cache()
|
||||
cache.set("key", "val")
|
||||
cache.delete("key")
|
||||
assert cache.get("key") is None
|
||||
assert cache.has("key") is False
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_clear():
|
||||
cache = Cache()
|
||||
cache.set("a", 1)
|
||||
cache.set("b", 2)
|
||||
cache.clear()
|
||||
assert cache.get("a") is None
|
||||
assert cache.get("b") is None
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_get_all():
|
||||
cache = Cache()
|
||||
cache.set("x", 10)
|
||||
cache.set("y", 20)
|
||||
values = cache.get_all()
|
||||
assert set(values) == {10, 20}
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_get_all_empty():
|
||||
cache = Cache()
|
||||
assert cache.get_all() == []
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_ttl_expiry():
|
||||
cache = Cache()
|
||||
cache.set("temp", "data", ttl=1)
|
||||
assert cache.get("temp") == "data"
|
||||
time.sleep(1.1)
|
||||
assert cache.get("temp") is None
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_has_after_ttl_expiry():
|
||||
cache = Cache()
|
||||
cache.set("temp", "data", ttl=1)
|
||||
time.sleep(1.1)
|
||||
assert cache.has("temp") is False
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_default_ttl():
|
||||
cache = Cache(default_ttl=1)
|
||||
cache.set("key", "value")
|
||||
assert cache.get("key") == "value"
|
||||
time.sleep(1.1)
|
||||
assert cache.get("key") is None
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_per_key_ttl_overrides_default():
|
||||
cache = Cache(default_ttl=60)
|
||||
cache.set("short", "val", ttl=1)
|
||||
time.sleep(1.1)
|
||||
assert cache.get("short") is None
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_get_all_excludes_expired():
|
||||
cache = Cache()
|
||||
cache.set("permanent", "keep")
|
||||
cache.set("temporary", "gone", ttl=1)
|
||||
time.sleep(1.1)
|
||||
values = cache.get_all()
|
||||
assert "keep" in values
|
||||
assert "gone" not in values
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_overwrite_key():
|
||||
cache = Cache()
|
||||
cache.set("key", "old")
|
||||
cache.set("key", "new")
|
||||
assert cache.get("key") == "new"
|
||||
cache.stop()
|
||||
|
||||
|
||||
def test_cleanup_removes_expired():
|
||||
cache = Cache(cleanup_interval=9999)
|
||||
cache.set("exp", "x", ttl=1)
|
||||
time.sleep(1.1)
|
||||
cache.cleanup()
|
||||
assert cache.get("exp") is None
|
||||
cache.stop()
|
||||
@@ -1,64 +0,0 @@
|
||||
import pytest
|
||||
from typing import List
|
||||
from cpl.core.utils.get_value import get_value
|
||||
|
||||
|
||||
def test_get_existing_str():
|
||||
assert get_value({"key": "hello"}, "key", str) == "hello"
|
||||
|
||||
|
||||
def test_get_existing_int_already_typed():
|
||||
# Value already has the correct type -> returned directly
|
||||
assert get_value({"count": 42}, "count", int) == 42
|
||||
|
||||
|
||||
def test_get_existing_float_already_typed():
|
||||
# Value already has the correct type -> returned directly
|
||||
assert get_value({"pi": 3.14}, "pi", float) == 3.14
|
||||
|
||||
|
||||
# NOTE: get_value calls cast() internally but does NOT return the cast result
|
||||
# (the return value is lost). String "42" -> int returns None instead of 42.
|
||||
# This is a known bug in get_value.
|
||||
def test_get_str_to_int_returns_none_bug():
|
||||
result = get_value({"count": "42"}, "count", int)
|
||||
assert result is None # Bug: should be 42
|
||||
|
||||
|
||||
def test_get_missing_key_returns_none():
|
||||
assert get_value({}, "missing", str) is None
|
||||
|
||||
|
||||
def test_get_missing_key_returns_default():
|
||||
assert get_value({}, "missing", str, "fallback") == "fallback"
|
||||
|
||||
|
||||
def test_get_value_already_correct_type():
|
||||
assert get_value({"x": 5}, "x", int) == 5
|
||||
|
||||
|
||||
def test_get_list_already_correct_type():
|
||||
result = get_value({"items": [1, 2, 3]}, "items", List[int])
|
||||
assert result == [1, 2, 3]
|
||||
|
||||
|
||||
def test_get_list_of_str_already_correct_type():
|
||||
result = get_value({"tags": ["a", "b"]}, "tags", List[str])
|
||||
assert result == ["a", "b"]
|
||||
|
||||
|
||||
def test_get_bool_already_typed():
|
||||
# Value is already bool -> returned directly
|
||||
assert get_value({"flag": True}, "flag", bool) is True
|
||||
assert get_value({"flag": False}, "flag", bool) is False
|
||||
|
||||
|
||||
# NOTE: Same bug – string "true" -> bool returns None
|
||||
def test_get_bool_from_str_returns_none_bug():
|
||||
result = get_value({"flag": "true"}, "flag", bool)
|
||||
assert result is None # Bug: should be True
|
||||
|
||||
|
||||
def test_cast_failure_returns_default():
|
||||
result = get_value({"val": "not_a_number"}, "val", int, -1)
|
||||
assert result == -1
|
||||
@@ -1,93 +0,0 @@
|
||||
import pytest
|
||||
from enum import Enum
|
||||
from cpl.core.utils.json_processor import JSONProcessor
|
||||
|
||||
|
||||
class Color(Enum):
|
||||
RED = "RED"
|
||||
GREEN = "GREEN"
|
||||
|
||||
|
||||
class Point:
|
||||
def __init__(self, x: int, y: int):
|
||||
self.x = x
|
||||
self.y = y
|
||||
|
||||
|
||||
class Named:
|
||||
def __init__(self, name: str, value: int = 0):
|
||||
self.name = name
|
||||
self.value = value
|
||||
|
||||
|
||||
class Nested:
|
||||
def __init__(self, point: Point, label: str):
|
||||
self.point = point
|
||||
self.label = label
|
||||
|
||||
|
||||
class WithEnum:
|
||||
def __init__(self, color: Color):
|
||||
self.color = color
|
||||
|
||||
|
||||
def test_simple_object():
|
||||
obj = JSONProcessor.process(Point, {"X": 3, "Y": 7})
|
||||
assert obj.x == 3
|
||||
assert obj.y == 7
|
||||
|
||||
|
||||
def test_camel_case_keys():
|
||||
obj = JSONProcessor.process(Named, {"Name": "Alice", "Value": 42})
|
||||
assert obj.name == "Alice"
|
||||
assert obj.value == 42
|
||||
|
||||
|
||||
def test_default_value_used():
|
||||
obj = JSONProcessor.process(Named, {"Name": "Bob"})
|
||||
assert obj.name == "Bob"
|
||||
assert obj.value == 0
|
||||
|
||||
|
||||
def test_none_for_missing_required():
|
||||
obj = JSONProcessor.process(Named, {})
|
||||
assert obj.name is None
|
||||
|
||||
|
||||
def test_nested_object():
|
||||
data = {"Point": {"X": 1, "Y": 2}, "Label": "origin"}
|
||||
obj = JSONProcessor.process(Nested, data)
|
||||
assert isinstance(obj.point, Point)
|
||||
assert obj.point.x == 1
|
||||
assert obj.point.y == 2
|
||||
assert obj.label == "origin"
|
||||
|
||||
|
||||
def test_enum_value():
|
||||
obj = JSONProcessor.process(WithEnum, {"Color": "RED"})
|
||||
assert obj.color == Color.RED
|
||||
|
||||
|
||||
def test_first_lower_key():
|
||||
obj = JSONProcessor.process(Named, {"name": "Charlie", "value": 7})
|
||||
assert obj.name == "Charlie"
|
||||
assert obj.value == 7
|
||||
|
||||
|
||||
def test_upper_case_key():
|
||||
obj = JSONProcessor.process(Named, {"NAME": "Dave"})
|
||||
assert obj.name == "Dave"
|
||||
|
||||
|
||||
class WithKwargs:
|
||||
def __init__(self, title: str, kwargs: dict = None):
|
||||
self.title = title
|
||||
self.kwargs = kwargs or {}
|
||||
|
||||
|
||||
def test_kwargs_collected():
|
||||
# Bug: JSONProcessor collects leftover keys into a dict for a `kwargs: dict` param,
|
||||
# but then passes them as **kwargs to the constructor instead of as a positional dict arg.
|
||||
# This causes TypeError when the constructor does not accept **kwargs.
|
||||
with pytest.raises(TypeError):
|
||||
JSONProcessor.process(WithKwargs, {"Title": "Hello", "extra1": "a", "extra2": "b"})
|
||||
Reference in New Issue
Block a user