Compare commits

..

5 Commits

Author SHA1 Message Date
1a67318091 Config model options handling. Closes #185
All checks were successful
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 19s
Build on push / dependency (push) Successful in 25s
Build on push / translation (push) Successful in 17s
Build on push / database (push) Successful in 20s
Build on push / application (push) Successful in 21s
Build on push / mail (push) Successful in 20s
Build on push / auth (push) Successful in 14s
2025-09-19 17:47:49 +02:00
2be58f6577 Introduced fernet to credential manager. Closes #183
All checks were successful
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 22s
Build on push / dependency (push) Successful in 15s
Build on push / application (push) Successful in 20s
Build on push / database (push) Successful in 21s
Build on push / translation (push) Successful in 21s
Build on push / mail (push) Successful in 22s
Build on push / auth (push) Successful in 18s
2025-09-19 15:01:16 +02:00
9c6078f4fd with_logging & logger level fix
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 18s
Build on push / dependency (push) Successful in 14s
Build on push / translation (push) Successful in 15s
Build on push / database (push) Successful in 17s
Build on push / application (push) Successful in 18s
Build on push / mail (push) Successful in 20s
Build on push / auth (push) Successful in 18s
2025-09-17 22:18:38 +02:00
dfdc31512d App with extension functions
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 17s
Build on push / query (push) Successful in 17s
Build on push / dependency (push) Successful in 14s
Build on push / translation (push) Successful in 14s
Build on push / database (push) Successful in 18s
Build on push / mail (push) Successful in 19s
Build on push / application (push) Successful in 22s
Build on push / auth (push) Successful in 14s
2025-09-17 21:56:47 +02:00
ab7ff7da93 Made startup/app extensions static 2025-09-17 20:54:21 +02:00
65 changed files with 541 additions and 492 deletions

1
.gitignore vendored
View File

@@ -113,6 +113,7 @@ venv.bak/
# Custom Environments
cpl-env/
.secret
# Spyder project settings
.spyderproject

View File

@@ -1,10 +1,18 @@
from abc import ABC, abstractmethod
from typing import Callable, Self
from cpl.application.host import Host
from cpl.core.console.console import Console
from cpl.core.log import LogSettings
from cpl.core.log.log_level_enum import LogLevel
from cpl.core.log.logger_abc import LoggerABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
def __not_implemented__(package: str, func: Callable):
raise NotImplementedError(f"Package {package} is required to use {func.__name__} method")
class ApplicationABC(ABC):
r"""ABC for the Application class
@@ -17,6 +25,53 @@ class ApplicationABC(ABC):
def __init__(self, services: ServiceProviderABC):
self._services = services
@classmethod
def extend(cls, name: str | Callable, func: Callable[[Self], Self]):
r"""Extend the Application with a custom method
Parameters:
name: :class:`str`
Name of the method
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
if callable(name):
name = name.__name__
setattr(cls, name, func)
return cls
def with_logging(self, level: LogLevel = None):
if level is None:
from cpl.core.configuration.configuration import Configuration
settings = Configuration.get(LogSettings)
level = settings.level if settings else LogLevel.info
logger = self._services.get_service(LoggerABC)
logger.set_level(level)
def with_permissions(self, *args, **kwargs):
__not_implemented__("cpl-auth", self.with_permissions)
def with_migrations(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_migrations)
def with_seeders(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_seeders)
def with_extension(self, func: Callable[[Self, ...], None], *args, **kwargs):
r"""Extend the Application with a custom method
Parameters:
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
assert func is not None, "func must not be None"
assert callable(func), "func must be callable"
func(self, *args, **kwargs)
def run(self):
r"""Entry point

View File

@@ -4,8 +4,7 @@ from cpl.dependency import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self): ...
@staticmethod
@abstractmethod
def run(self, services: ServiceProviderABC): ...
def run(services: ServiceProviderABC): ...

View File

@@ -6,15 +6,14 @@ from cpl.dependency.service_collection import ServiceCollection
class StartupABC(ABC):
r"""ABC for the startup class"""
@staticmethod
@abstractmethod
def __init__(self): ...
@abstractmethod
def configure_configuration(self):
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
def configure_services(self, service: ServiceCollection):
def configure_services(service: ServiceCollection):
r"""Creates service provider
Parameter:

View File

@@ -6,15 +6,14 @@ from cpl.dependency import ServiceCollection
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@staticmethod
@abstractmethod
def __init__(self): ...
@abstractmethod
def configure_configuration(self):
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
def configure_services(self, services: ServiceCollection):
def configure_services(services: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`

View File

@@ -1,5 +1,5 @@
import asyncio
from typing import Type, Optional, Callable
from typing import Type, Optional
from cpl.application.abc.application_abc import ApplicationABC
from cpl.application.abc.application_extension_abc import ApplicationExtensionABC
@@ -34,11 +34,11 @@ class ApplicationBuilder:
def service_provider(self):
return self._services.build()
def use_startup(self, startup: Type[StartupABC]) -> "ApplicationBuilder":
self._startup = startup()
def with_startup(self, startup: Type[StartupABC]) -> "ApplicationBuilder":
self._startup = startup
return self
def use_extension(
def with_extension(
self,
extension: Type[ApplicationExtensionABC | StartupExtensionABC],
) -> "ApplicationBuilder":
@@ -50,8 +50,7 @@ class ApplicationBuilder:
return self
def build(self) -> ApplicationABC:
for ex in self._startup_extensions:
extension = ex()
for extension in self._startup_extensions:
Host.run(extension.configure_configuration)
Host.run(extension.configure_services, self._services)
@@ -59,8 +58,7 @@ class ApplicationBuilder:
Host.run(self._startup.configure_configuration)
Host.run(self._startup.configure_services, self._services)
for ex in self._app_extensions:
extension = ex()
for extension in self._app_extensions:
Host.run(extension.run, self.service_provider)
return self._app(self.service_provider)

View File

@@ -1,13 +1,24 @@
from cpl.auth import permission as _permission
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin
from cpl.auth.keycloak.keycloak_client import KeycloakClient
from cpl.dependency import ServiceCollection as _ServiceCollection
from enum import Enum
from typing import Type
from cpl.application.abc import ApplicationABC as _ApplicationABC
from cpl.auth import permission as _permission
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin as _KeycloakAdmin
from cpl.auth.keycloak.keycloak_client import KeycloakClient as _KeycloakClient
from cpl.dependency.service_collection import ServiceCollection as _ServiceCollection
from .auth_logger import AuthLogger
from .keycloak_settings import KeycloakSettings
from .permission_seeder import PermissionSeeder
def _with_permissions(self: _ApplicationABC, *permissions: Type[Enum]) -> _ApplicationABC:
from cpl.auth.permission.permissions_registry import PermissionsRegistry
for perm in permissions:
PermissionsRegistry.with_enum(perm)
return self
def _add_daos(collection: _ServiceCollection):
from .schema._administration.auth_user_dao import AuthUserDao
from .schema._administration.api_key_dao import ApiKeyDao
@@ -34,8 +45,8 @@ def add_auth(collection: _ServiceCollection):
from cpl.database.model.server_type import ServerType, ServerTypes
try:
collection.add_singleton(KeycloakClient)
collection.add_singleton(KeycloakAdmin)
collection.add_singleton(_KeycloakClient)
collection.add_singleton(_KeycloakAdmin)
_add_daos(collection)
@@ -68,3 +79,4 @@ def add_permission(collection: _ServiceCollection):
_ServiceCollection.with_module(add_auth, __name__)
_ServiceCollection.with_module(add_permission, _permission.__name__)
_ApplicationABC.extend(_ApplicationABC.with_permissions, _with_permissions)

View File

@@ -1,37 +1,17 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.environment import Environment
class KeycloakSettings(ConfigurationModelABC):
def __init__(
self,
url: str = Environment.get("KEYCLOAK_URL", str),
client_id: str = Environment.get("KEYCLOAK_CLIENT_ID", str),
realm: str = Environment.get("KEYCLOAK_REALM", str),
client_secret: str = Environment.get("KEYCLOAK_CLIENT_SECRET", str),
src: Optional[dict] = None,
):
ConfigurationModelABC.__init__(self)
ConfigurationModelABC.__init__(self, src, "KEYCLOAK")
self._url: Optional[str] = url
self._client_id: Optional[str] = client_id
self._realm: Optional[str] = realm
self._client_secret: Optional[str] = client_secret
@property
def url(self) -> Optional[str]:
return self._url
@property
def client_id(self) -> Optional[str]:
return self._client_id
@property
def realm(self) -> Optional[str]:
return self._realm
@property
def client_secret(self) -> Optional[str]:
return self._client_secret
self.option("url", str, required=True)
self.option("client_id", str, required=True)
self.option("realm", str, required=True)
self.option("client_secret", str, required=True)

View File

@@ -1,25 +1,27 @@
import secrets
from datetime import datetime
from typing import Optional
from typing import Optional, Union
from async_property import async_property
from cpl.auth.permission.permissions import Permissions
from cpl.core.environment import Environment
from cpl.core.log import Logger
from cpl.core.typing import SerialId, Id
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
from cpl.core.environment.environment import Environment
from cpl.core.log.logger import Logger
from cpl.core.typing import Id, SerialId
from cpl.core.utils.credential_manager import CredentialManager
from cpl.database.abc.db_model_abc import DbModelABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
_logger = Logger(__name__)
class ApiKey(DbModelABC):
def __init__(
self,
id: SerialId,
identifier: str,
key: str,
key: Union[str, bytes],
deleted: bool = False,
editor_id: Optional[Id] = None,
created: Optional[datetime] = None,
@@ -37,12 +39,17 @@ class ApiKey(DbModelABC):
def key(self) -> str:
return self._key
@property
def plain_key(self) -> str:
return CredentialManager.decrypt(self.key)
@async_property
async def permissions(self):
from cpl.auth.schema._permission.api_key_permission_dao import ApiKeyPermissionDao
api_key_permission_dao: ApiKeyPermissionDao = ServiceProviderABC.get_global_service(ApiKeyPermissionDao)
return [await x.permission for x in await api_key_permission_dao.find_by_api_key_id(self.id)]
apiKeyPermissionDao = ServiceProviderABC.get_global_provider().get_service(ApiKeyPermissionDao)
return [await x.permission for x in await apiKeyPermissionDao.find_by_api_key_id(self.id)]
async def has_permission(self, permission: Permissions) -> bool:
return permission.value in [x.name for x in await self.permissions]
@@ -52,7 +59,7 @@ class ApiKey(DbModelABC):
@staticmethod
def new_key() -> str:
return f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}"
return CredentialManager.encrypt(f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}")
@classmethod
def new(cls, identifier: str) -> "ApiKey":

View File

@@ -5,7 +5,7 @@ from typing import Optional
from async_property import async_property
from keycloak import KeycloakGetError
from cpl.auth import KeycloakAdmin
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId

View File

@@ -7,9 +7,7 @@ from typing import Any
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.console.console import Console
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.environment.environment import Environment
from cpl.core.typing import D, T
from cpl.core.utils.json_processor import JSONProcessor
class Configuration:
@@ -88,6 +86,8 @@ class Configuration:
if os.path.isabs(name):
file_path = name
else:
from cpl.core.environment import Environment
path_root = Environment.get_cwd()
if path is not None:
path_root = path
@@ -115,9 +115,7 @@ class Configuration:
if sub.__name__ != key and sub.__name__.replace("Settings", "") != key:
continue
configuration = JSONProcessor.process(sub, value)
cls.set(sub, configuration)
cls.set(sub, sub(value))
@classmethod
def set(cls, key: Any, value: T):

View File

@@ -1,7 +1,81 @@
from abc import ABC
from abc import ABC, abstractmethod
from typing import Optional, Type, Any
from cpl.core.typing import T
from cpl.core.utils.cast import cast
from cpl.core.utils.get_value import get_value
from cpl.core.utils.string import String
class ConfigurationModelABC(ABC):
r"""
ABC for configuration model classes
"""
@abstractmethod
def __init__(
self,
src: Optional[dict] = None,
env_prefix: Optional[str] = None,
readonly: bool = True,
):
ABC.__init__(self)
self._src = src or {}
self._options: dict[str, Any] = {}
self._env_prefix = env_prefix
self._readonly = readonly
def __setattr__(self, attr: str, value: Any):
if hasattr(self, "_readonly") and self._readonly:
raise AttributeError(f"Cannot set attribute: {attr}. {type(self).__name__} is read-only")
super().__setattr__(attr, value)
def __getattr__(self, attr: str) -> Any:
options = super().__getattribute__("_options")
if attr in options:
return options[attr]
return super().__getattribute__(attr)
def option(self, field: str, cast_type: Type[T], default=None, required=False, from_env=True):
value = None
field_variants = [
field,
String.first_to_upper(field),
String.first_to_lower(field),
String.to_camel_case(field),
String.to_snake_case(field),
String.first_to_upper(String.to_camel_case(field)),
]
value = None
for variant in field_variants:
if variant in self._src:
value = self._src[variant]
break
if value is None and from_env:
from cpl.core.environment import Environment
env_field = field.upper()
if self._env_prefix:
env_field = f"{self._env_prefix}_{env_field}"
value = Environment.get(env_field, cast_type)
if value is None and required:
raise ValueError(f"{field} is required")
elif value is None:
self._options[field] = default
return
self._options[field] = cast(value, cast_type)
def get(self, field: str, default=None) -> Optional[T]:
return get_value(self._src, field, self._options[field].type, default)
def to_dict(self) -> dict:
return {field: self.get(field) for field in self._options.keys()}

View File

@@ -1,9 +1,9 @@
import os
from socket import gethostname
from typing import Optional, Type
from typing import Type
from cpl.core.environment.environment_enum import EnvironmentEnum
from cpl.core.typing import T
from cpl.core.typing import T, D
from cpl.core.utils.get_value import get_value
@@ -55,14 +55,14 @@ class Environment:
os.environ[key] = str(value)
@staticmethod
def get(key: str, cast_type: Type[T], default: Optional[T] = None) -> Optional[T]:
def get(key: str, cast_type: Type[T], default: D = None) -> T | D:
"""
Get an environment variable and cast it to a specified type.
:param str key: The name of the environment variable.
:param Type[T] cast_type: A callable to cast the variable's value.
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
:param T default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
:return: The casted value, or None if the variable is not found.
:rtype: Optional[T]
:rtype: T | D
"""
return get_value(dict(os.environ), key, cast_type, default)

View File

@@ -1,4 +1,4 @@
from .logger import Logger
from .logger_abc import LoggerABC
from .log_level_enum import LogLevelEnum
from .log_level_enum import LogLevel
from .logging_settings import LogSettings

View File

@@ -1,7 +1,7 @@
from enum import Enum
class LogLevelEnum(Enum):
class LogLevel(Enum):
off = "OFF" # Nothing
trace = "TRC" # Detailed app information's
debug = "DEB" # Detailed app state

View File

@@ -3,28 +3,31 @@ import traceback
from datetime import datetime
from cpl.core.console import Console
from cpl.core.log.log_level_enum import LogLevelEnum
from cpl.core.log.log_level_enum import LogLevel
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.typing import Messages, Source
class Logger(LoggerABC):
_level = LogLevelEnum.info
_levels = [x for x in LogLevelEnum]
_level = LogLevel.info
_levels = [x for x in LogLevel]
# ANSI color codes for different log levels
_COLORS = {
LogLevelEnum.trace: "\033[37m", # Light Gray
LogLevelEnum.debug: "\033[94m", # Blue
LogLevelEnum.info: "\033[92m", # Green
LogLevelEnum.warning: "\033[93m", # Yellow
LogLevelEnum.error: "\033[91m", # Red
LogLevelEnum.fatal: "\033[95m", # Magenta
LogLevel.trace: "\033[37m", # Light Gray
LogLevel.debug: "\033[94m", # Blue
LogLevel.info: "\033[92m", # Green
LogLevel.warning: "\033[93m", # Yellow
LogLevel.error: "\033[91m", # Red
LogLevel.fatal: "\033[95m", # Magenta
}
def __init__(self, source: Source, file_prefix: str = None):
LoggerABC.__init__(self)
assert source is not None and source != "", "Source cannot be None or empty"
if source == LoggerABC.__name__:
source = None
self._source = source
if file_prefix is None:
@@ -45,7 +48,7 @@ class Logger(LoggerABC):
os.makedirs("logs")
@classmethod
def set_level(cls, level: LogLevelEnum):
def set_level(cls, level: LogLevel):
if level in cls._levels:
cls._level = level
else:
@@ -69,7 +72,7 @@ class Logger(LoggerABC):
log_file.write(content + "\n")
log_file.close()
def _log(self, level: LogLevelEnum, *messages: Messages):
def _log(self, level: LogLevel, *messages: Messages):
try:
if self._levels.index(level) < self._levels.index(self._level):
return
@@ -78,7 +81,7 @@ class Logger(LoggerABC):
formatted_message = self._format_message(level.value, timestamp, *messages)
self._write_log_to_file(formatted_message)
Console.write_line(f"{self._COLORS.get(self._level, '\033[0m')}{formatted_message}\033[0m")
Console.write_line(f"{self._COLORS.get(level, '\033[0m')}{formatted_message}\033[0m")
except Exception as e:
print(f"Error while logging: {e} -> {traceback.format_exc()}")
@@ -91,27 +94,35 @@ class Logger(LoggerABC):
messages = [str(message) for message in messages if message is not None]
return f"<{timestamp}> [{level.upper():^3}] [{self._file_prefix}] - [{self._source}]: {' '.join(messages)}"
message = f"<{timestamp}>"
message += f" [{level.upper():^3}]"
message += f" [{self._file_prefix}]"
if self._source is not None:
message += f" - [{self._source}]"
message += f": {' '.join(messages)}"
return message
def header(self, string: str):
self._log(LogLevelEnum.info, string)
self._log(LogLevel.info, string)
def trace(self, *messages: Messages):
self._log(LogLevelEnum.trace, *messages)
self._log(LogLevel.trace, *messages)
def debug(self, *messages: Messages):
self._log(LogLevelEnum.debug, *messages)
self._log(LogLevel.debug, *messages)
def info(self, *messages: Messages):
self._log(LogLevelEnum.info, *messages)
self._log(LogLevel.info, *messages)
def warning(self, *messages: Messages):
self._log(LogLevelEnum.warning, *messages)
self._log(LogLevel.warning, *messages)
def error(self, message, e: Exception = None):
self._log(LogLevelEnum.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
self._log(LogLevel.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
def fatal(self, message, e: Exception = None, prevent_quit: bool = False):
self._log(LogLevelEnum.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
self._log(LogLevel.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
if not prevent_quit:
exit(-1)

View File

@@ -1,5 +1,6 @@
from abc import abstractmethod, ABC
from cpl.core.log.log_level_enum import LogLevel
from cpl.core.typing import Messages
@@ -7,7 +8,7 @@ class LoggerABC(ABC):
r"""ABC for :class:`cpl.core.log.logger_service.Logger`"""
@abstractmethod
def set_level(self, level: str): ...
def set_level(self, level: LogLevel): ...
@abstractmethod
def _format_message(self, level: str, timestamp, *messages: Messages) -> str: ...

View File

@@ -1,53 +1,18 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.log.log_level_enum import LogLevelEnum
from cpl.core.log.log_level_enum import LogLevel
class LogSettings(ConfigurationModelABC):
r"""Representation of logging settings"""
def __init__(
self,
path: str = None,
filename: str = None,
console_log_level: LogLevelEnum = None,
file_log_level: LogLevelEnum = None,
src: Optional[dict] = None,
):
ConfigurationModelABC.__init__(self)
self._path: Optional[str] = path
self._filename: Optional[str] = filename
self._console: Optional[LogLevelEnum] = console_log_level
self._level: Optional[LogLevelEnum] = file_log_level
ConfigurationModelABC.__init__(self, src)
@property
def path(self) -> str:
return self._path
@path.setter
def path(self, path: str) -> None:
self._path = path
@property
def filename(self) -> str:
return self._filename
@filename.setter
def filename(self, filename: str) -> None:
self._filename = filename
@property
def console(self) -> LogLevelEnum:
return self._console
@console.setter
def console(self, console: LogLevelEnum) -> None:
self._console = console
@property
def level(self) -> LogLevelEnum:
return self._level
@level.setter
def level(self, level: LogLevelEnum) -> None:
self._level = level
self.option("path", str, default="logs")
self.option("filename", str, default="app.log")
self.option("console_level", LogLevel, default=LogLevel.info)
self.option("level", LogLevel, default=LogLevel.info)

View File

@@ -3,3 +3,4 @@ from .credential_manager import CredentialManager
from .json_processor import JSONProcessor
from .pip import Pip
from .string import String
from .get_value import get_value

View File

@@ -0,0 +1,64 @@
from enum import Enum
from typing import Type, Any
from cpl.core.typing import T
def _cast_enum(value: str, enum_type: Type[Enum]) -> Enum:
try:
return enum_type(value)
except ValueError:
pass
try:
return enum_type(value.lower())
except ValueError:
pass
try:
return enum_type(value.upper())
except ValueError:
pass
try:
return enum_type[value]
except KeyError:
pass
try:
return enum_type[value.lower()]
except KeyError:
pass
try:
return enum_type[value.upper()]
except KeyError:
pass
raise ValueError(f"Cannot cast value '{value}' to enum '{enum_type.__name__}'")
def cast(value: Any, cast_type: Type[T], list_delimiter: str = ",") -> T:
"""
Cast a value to a specified type.
:param Any value: Value to be casted.
:param Type[T] cast_type: A callable to cast the variable's value.
:param str list_delimiter: The delimiter to split the value into a list. Defaults to ",".
:return:
"""
if cast_type == bool:
return value.lower() in ["true", "1", "yes", "on"]
if issubclass(cast_type, Enum):
return _cast_enum(value, cast_type)
if (cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__) == list:
if not (value.startswith("[") and value.endswith("]")) and list_delimiter not in value:
raise ValueError("List values must be enclosed in square brackets or use a delimiter.")
if value.startswith("[") and value.endswith("]"):
value = value[1:-1]
value = value.split(list_delimiter)
subtype = cast_type.__args__[0] if hasattr(cast_type, "__args__") else None
return [subtype(item) if subtype is not None else item for item in value]
return cast_type(value)

View File

@@ -1,12 +1,42 @@
import base64
import os
from cryptography.fernet import Fernet
from cpl.core.log.logger import Logger
_logger = Logger(__name__)
class CredentialManager:
r"""Handles credential encryption and decryption"""
@staticmethod
def encrypt(string: str) -> str:
r"""Encode with base64
_secret: str = None
@classmethod
def with_secret(cls, file: str = None):
if file is None:
file = ".secret"
if not os.path.isfile(file):
dirname = os.path.dirname(file)
if dirname != "":
os.makedirs(dirname, exist_ok=True)
with open(file, "w") as secret_file:
secret_file.write(Fernet.generate_key().decode())
secret_file.close()
_logger.warning("Secret file not found, regenerating")
with open(file, "r") as secret_file:
secret = secret_file.read().strip()
if secret == "" or secret is None:
_logger.fatal("No secret found in .secret file.")
cls._secret = str(secret)
@classmethod
def encrypt(cls, string: str) -> str:
r"""Encode with Fernet
Parameter:
string: :class:`str`
@@ -15,11 +45,11 @@ class CredentialManager:
Returns:
Encoded string
"""
return base64.b64encode(string.encode("utf-8")).decode("utf-8")
return Fernet(cls._secret).encrypt(string.encode()).decode()
@staticmethod
def decrypt(string: str) -> str:
r"""Decode with base64
@classmethod
def decrypt(cls, string: str) -> str:
r"""Decode with Fernet
Parameter:
string: :class:`str`
@@ -28,19 +58,4 @@ class CredentialManager:
Returns:
Decoded string
"""
return base64.b64decode(string).decode("utf-8")
@staticmethod
def build_string(string: str, credentials: str):
r"""Builds string with credentials in it
Parameter:
string: :class:`str`
String in which the variable is replaced by credentials
credentials: :class:`str`
String to encode
Returns:
Decoded string
"""
return string.replace("$credentials", CredentialManager.decrypt(credentials))
return Fernet(cls._secret).decrypt(string).decode()

View File

@@ -1,6 +1,7 @@
from typing import Type, Optional
from cpl.core.typing import T
from cpl.core.utils.cast import cast
def get_value(
@@ -37,20 +38,6 @@ def get_value(
return value
try:
if cast_type == bool:
return value.lower() in ["true", "1"]
if (cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__) == list:
if not (value.startswith("[") and value.endswith("]")) and list_delimiter not in value:
raise ValueError("List values must be enclosed in square brackets or use a delimiter.")
if value.startswith("[") and value.endswith("]"):
value = value[1:-1]
value = value.split(list_delimiter)
subtype = cast_type.__args__[0] if hasattr(cast_type, "__args__") else None
return [subtype(item) if subtype is not None else item for item in value]
return cast_type(value)
cast(cast_type, value, list_delimiter)
except (ValueError, TypeError):
return default

View File

@@ -17,7 +17,11 @@ class String:
Returns:
String converted to CamelCase
"""
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
s = String.to_snake_case(s)
words = s.split('_')
return words[0] + ''.join(word.title() for word in words[1:])
# return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
@staticmethod
def to_snake_case(chars: str) -> str:

View File

@@ -1,11 +1,33 @@
from typing import Type
from cpl.application.abc import ApplicationABC as _ApplicationABC
from cpl.dependency import ServiceCollection as _ServiceCollection
from . import mysql as _mysql
from . import postgres as _postgres
from .table_manager import TableManager
def _with_migrations(self: _ApplicationABC, *paths: list[str]) -> _ApplicationABC:
from cpl.application.host import Host
from cpl.database.service.migration_service import MigrationService
migration_service = self._services.get_service(MigrationService)
migration_service.with_directory("./scripts")
Host.run(migration_service.migrate)
return self
def _with_seeders(self: _ApplicationABC) -> _ApplicationABC:
from cpl.database.service.seeder_service import SeederService
from cpl.application.host import Host
seeder_service: SeederService = self._services.get_service(SeederService)
Host.run(seeder_service.seed)
return self
def _add(collection: _ServiceCollection, db_context: Type, default_port: int, server_type: str):
from cpl.core.console import Console
from cpl.core.configuration import Configuration
@@ -44,3 +66,5 @@ def add_postgres(collection: _ServiceCollection):
_ServiceCollection.with_module(add_mysql, _mysql.__name__)
_ServiceCollection.with_module(add_postgres, _postgres.__name__)
_ApplicationABC.extend(_ApplicationABC.with_migrations, _with_migrations)
_ApplicationABC.extend(_ApplicationABC.with_seeders, _with_seeders)

View File

@@ -6,7 +6,7 @@ from typing import Generic, Optional, Union, Type, List, Any
from cpl.core.ctx import get_user
from cpl.core.typing import T, Id
from cpl.core.utils import String
from cpl.core.utils.string import String
from cpl.core.utils.get_value import get_value
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.const import DATETIME_FORMAT

View File

@@ -2,75 +2,23 @@ from typing import Optional
from cpl.core.configuration import Configuration
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.environment import Environment
from cpl.core.utils import Base64
class DatabaseSettings(ConfigurationModelABC):
r"""Represents settings for the database connection"""
def __init__(
self,
host: str = Environment.get("DB_HOST", str),
port: int = Environment.get("DB_PORT", str, Configuration.get("DB_DEFAULT_PORT", 0)),
user: str = Environment.get("DB_USER", str),
password: str = Environment.get("DB_PASSWORD", str),
database: str = Environment.get("DB_DATABASE", str),
charset: str = Environment.get("DB_CHARSET", str, "utf8mb4"),
use_unicode: bool = Environment.get("DB_USE_UNICODE", bool, False),
buffered: bool = Environment.get("DB_BUFFERED", bool, False),
auth_plugin: str = Environment.get("DB_AUTH_PLUGIN", str, "caching_sha2_password"),
ssl_disabled: bool = Environment.get("DB_SSL_DISABLED", bool, False),
src: Optional[dict] = None,
):
ConfigurationModelABC.__init__(self)
ConfigurationModelABC.__init__(self, src, "DB")
self._host: Optional[str] = host
self._port: Optional[int] = port
self._user: Optional[str] = user
self._password: Optional[str] = Base64.decode(password) if Base64.is_b64(password) else password
self._database: Optional[str] = database
self._charset: Optional[str] = charset
self._use_unicode: Optional[bool] = use_unicode
self._buffered: Optional[bool] = buffered
self._auth_plugin: Optional[str] = auth_plugin
self._ssl_disabled: Optional[bool] = ssl_disabled
@property
def host(self) -> Optional[str]:
return self._host
@property
def port(self) -> Optional[int]:
return self._port
@property
def user(self) -> Optional[str]:
return self._user
@property
def password(self) -> Optional[str]:
return self._password
@property
def database(self) -> Optional[str]:
return self._database
@property
def charset(self) -> Optional[str]:
return self._charset
@property
def use_unicode(self) -> Optional[bool]:
return self._use_unicode
@property
def buffered(self) -> Optional[bool]:
return self._buffered
@property
def auth_plugin(self) -> Optional[str]:
return self._auth_plugin
@property
def ssl_disabled(self) -> Optional[bool]:
return self._ssl_disabled
self.option("host", str, required=True)
self.option("port", int, Configuration.get("DB_DEFAULT_PORT"), required=True)
self.option("user", str, required=True)
self.option("password", str, required=True)
self.option("database", str, required=True)
self.option("charset", str, "utf8mb4")
self.option("use_unicode", bool, False)
self.option("buffered", bool, False)
self.option("auth_plugin", str, "caching_sha2_password")
self.option("ssl_disabled", bool, False)

View File

@@ -5,8 +5,7 @@ from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.cursor import MySQLCursorBuffered
from cpl.database.abc.connection_abc import ConnectionABC
from cpl.database.database_settings import DatabaseSettings
from cpl.core.utils.credential_manager import CredentialManager
from cpl.database.model.database_settings import DatabaseSettings
class DatabaseConnection(ConnectionABC):
@@ -31,7 +30,7 @@ class DatabaseConnection(ConnectionABC):
host=settings.host,
port=settings.port,
user=settings.user,
passwd=CredentialManager.decrypt(settings.password),
passwd=settings.password,
charset=settings.charset,
use_unicode=settings.use_unicode,
buffered=settings.buffered,
@@ -43,7 +42,7 @@ class DatabaseConnection(ConnectionABC):
host=settings.host,
port=settings.port,
user=settings.user,
passwd=CredentialManager.decrypt(settings.password),
passwd=settings.password,
db=settings.database,
charset=settings.charset,
use_unicode=settings.use_unicode,

View File

@@ -2,7 +2,6 @@ from cpl.dependency import ServiceCollection as _ServiceCollection
from .abc.email_client_abc import EMailClientABC
from .email_client import EMailClient
from .email_client_settings import EMailClientSettings
from .email_client_settings_name_enum import EMailClientSettingsNameEnum
from .email_model import EMail
from .mail_logger import MailLogger

View File

@@ -2,7 +2,6 @@ import ssl
from smtplib import SMTP
from typing import Optional
from cpl.core.utils.credential_manager import CredentialManager
from cpl.mail.abc.email_client_abc import EMailClientABC
from cpl.mail.email_client_settings import EMailClientSettings
from cpl.mail.email_model import EMail
@@ -62,9 +61,7 @@ class EMailClient(EMailClientABC):
__name__,
f"Try to login {self._mail_settings.user_name}@{self._mail_settings.host}:{self._mail_settings.port}",
)
self._server.login(
self._mail_settings.user_name, CredentialManager.decrypt(self._mail_settings.credentials)
)
self._server.login(self._mail_settings.user_name, self._mail_settings.credentials)
self._logger.info(
__name__,
f"Logged on as {self._mail_settings.user_name} to {self._mail_settings.host}:{self._mail_settings.port}",

View File

@@ -1,51 +1,17 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
class EMailClientSettings(ConfigurationModelABC):
r"""Representation of mailing settings"""
def __init__(
self,
host: str = None,
port: int = None,
user_name: str = None,
credentials: str = None,
self,
src: Optional[dict] = None,
):
ConfigurationModelABC.__init__(self)
ConfigurationModelABC.__init__(self, src, "EMAIL")
self._host: str = host
self._port: int = port
self._user_name: str = user_name
self._credentials: str = credentials
@property
def host(self) -> str:
return self._host
@host.setter
def host(self, host: str) -> None:
self._host = host
@property
def port(self) -> int:
return self._port
@port.setter
def port(self, port: int) -> None:
self._port = port
@property
def user_name(self) -> str:
return self._user_name
@user_name.setter
def user_name(self, user_name: str) -> None:
self._user_name = user_name
@property
def credentials(self) -> str:
return self._credentials
@credentials.setter
def credentials(self, credentials: str) -> None:
self._credentials = credentials
self.option("host", str, required=True)
self.option("port", int, 587, required=True)
self.option("user_name", str, required=True)
self.option("credentials", str, required=True)

View File

@@ -1,8 +0,0 @@
from enum import Enum
class EMailClientSettingsNameEnum(Enum):
host = "Host"
port = "Port"
user_name = "UserName"
credentials = "Credentials"

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "database",
"Version": {
"Major": "0",
@@ -22,7 +22,7 @@
"PythonPath": {},
"Classifiers": []
},
"BuildSettings": {
"Build": {
"ProjectType": "console",
"SourcePath": "src",
"OutputPath": "dist",

View File

@@ -1,5 +1,5 @@
from cpl.application.abc.application_abc import ApplicationABC
from cpl.auth import KeycloakAdmin
from cpl.application.abc import ApplicationABC
from cpl.auth.keycloak import KeycloakAdmin
from cpl.core.console import Console
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
@@ -14,7 +14,7 @@ class Application(ApplicationABC):
def __init__(self, services: ServiceProviderABC):
ApplicationABC.__init__(self, services)
self._logger: LoggerABC = services.get_service(LoggerABC)
self._logger = services.get_service(LoggerABC)
async def test_daos(self):
userDao: UserDao = self._services.get_service(UserDao)

View File

@@ -1,8 +1,8 @@
{
"LoggingSettings": {
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "TRACE",
"FileLogLevel": "TRACE"
"ConsoleLevel": "TRACE",
"Level": "TRACE"
}
}

View File

@@ -1,19 +1,19 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Log": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "TRACE",
"FileLogLevel": "TRACE"
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"DatabaseSettings": {
"Database": {
"AuthPlugin": "mysql_native_password",
"ConnectionString": "mysql+mysqlconnector://cpl:$credentials@localhost/cpl",
"Credentials": "Y3Bs",

View File

@@ -1,23 +1,23 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Log": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "TRACE",
"FileLogLevel": "TRACE"
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"DatabaseSettings": {
"Database": {
"Host": "localhost",
"User": "cpl",
"Port": 3306,
"Password": "Y3Bs",
"Password": "cpl",
"Database": "cpl",
"Charset": "utf8mb4",
"UseUnicode": "true",

View File

@@ -1,15 +1,15 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Log": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "ERROR",
"FileLogLevel": "WARN"
"ConsoleLevel": "ERROR",
"Level": "WARNING"
}
}

View File

@@ -1,11 +1,17 @@
from application import Application
from cpl.application import ApplicationBuilder
from custom_permissions import CustomPermissions
from startup import Startup
def main():
builder = ApplicationBuilder(Application).use_startup(Startup)
builder = ApplicationBuilder(Application).with_startup(Startup)
app = builder.build()
app.with_logging()
app.with_permissions(CustomPermissions)
app.with_migrations("./scripts")
app.with_seeders()
app.run()

View File

@@ -1,12 +1,24 @@
from application import Application
from cpl.application import ApplicationBuilder
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.core.console import Console
from cpl.core.log import LogLevel
from custom_permissions import CustomPermissions
from startup import Startup
def main():
builder = ApplicationBuilder(Application).use_startup(Startup)
builder = ApplicationBuilder(Application).with_startup(Startup)
builder.services.add_logging()
app = builder.build()
app.with_logging(LogLevel.trace)
app.with_permissions(CustomPermissions)
app.with_migrations("./scripts")
app.with_seeders()
Console.write_line(CustomPermissions.test.value in PermissionsRegistry.get())
app.run()
Console.write_line("Hello from main_simplified.py!")

View File

@@ -1,30 +1,26 @@
from cpl import auth
from cpl.application.abc.startup_abc import StartupABC
from cpl.auth import permission
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.core.log import Logger, LoggerABC
from cpl.database import mysql
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.service.migration_service import MigrationService
from cpl.database.service.seeder_service import SeederService
from cpl.dependency import ServiceCollection
from custom_permissions import CustomPermissions
from model.city_dao import CityDao
from model.user_dao import UserDao
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
async def configure_configuration(self):
@staticmethod
async def configure_configuration():
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
async def configure_services(self, services: ServiceCollection):
@staticmethod
async def configure_services(services: ServiceCollection):
services.add_module(mysql)
services.add_module(auth)
services.add_module(permission)
@@ -33,13 +29,3 @@ class Startup(StartupABC):
services.add_transient(DataAccessObjectABC, CityDao)
services.add_singleton(LoggerABC, Logger)
PermissionsRegistry.with_enum(CustomPermissions)
provider = services.build()
migration_service: MigrationService = provider.get_service(MigrationService)
migration_service.with_directory("./scripts")
await migration_service.migrate()
seeder_service: SeederService = provider.get_service(SeederService)
await seeder_service.seed()

View File

@@ -1,15 +1,15 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "ERROR",
"FileLogLevel": "WARN"
"ConsoleLevel": "ERROR",
"Level": "WARN"
}
}

View File

@@ -1,5 +1,5 @@
{
"WorkspaceSettings": {
"Workspace": {
"DefaultProject": "di",
"Projects": {
"di": "src/di/di.json"

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "di",
"Version": {
"Major": "0",
@@ -25,7 +25,7 @@
"PythonPath": {},
"Classifiers": []
},
"BuildSettings": {
"Build": {
"ProjectType": "console",
"SourcePath": "",
"OutputPath": "../../dist",

View File

@@ -6,7 +6,7 @@ from di.startup import Startup
def main():
app_builder = ApplicationBuilder(Application)
app_builder.use_startup(Startup)
app_builder.with_startup(Startup)
app_builder.build().run()

View File

@@ -1,8 +1,8 @@
{
"LoggingSettings": {
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "TRACE",
"FileLogLevel": "TRACE"
"ConsoleLevel": "TRACE",
"Level": "TRACE"
}
}

View File

@@ -1,63 +1,20 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "TRACE",
"FileLogLevel": "TRACE"
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"EMailClientSettings": {
"EMailClient": {
"Host": "mail.sh-edraft.de",
"Port": "587",
"UserName": "dev-srv@sh-edraft.de",
"Credentials": "RmBOQX1eNFYiYjgsSid3fV1nelc2WA=="
},
"PublishSettings": {
"SourcePath": "../",
"DistPath": "../../dist",
"Templates": [
{
"TemplatePath": "../../publish_templates/all_template.txt",
"Name": "all",
"Description": "",
"LongDescription": "",
"CopyrightDate": "2020",
"CopyrightName": "sh-edraft.de",
"LicenseName": "MIT",
"LicenseDescription": ", see LICENSE for more details.",
"Title": "",
"Author": "Sven Heidemann",
"Version": {
"Major": 2020,
"Minor": 12,
"Micro": 9
}
},
{
"TemplatePath": "../../publish_templates/all_template.txt",
"Name": "sh_edraft",
"Description": "common python library",
"LongDescription": "Library to share common classes and models used at sh-edraft.de",
"CopyrightDate": "2020",
"CopyrightName": "sh-edraft.de",
"LicenseName": "MIT",
"LicenseDescription": ", see LICENSE for more details.",
"Title": "",
"Author": "Sven Heidemann",
"Version": {
"Major": 2020,
"Minor": 12,
"Micro": 9
}
}
],
"IncludedFiles": [],
"ExcludedFiles": [],
"TemplateEnding": "_template.txt"
}
}

View File

@@ -1,26 +1,26 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "TRACE",
"FileLogLevel": "TRACE"
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"EMailClientSettings": {
"EMailClient": {
"Host": "mail.sh-edraft.de",
"Port": "587",
"UserName": "dev-srv@sh-edraft.de",
"Credentials": "RmBOQX1eNFYiYjgsSid3fV1nelc2WA=="
},
"DatabaseSettings": {
"Database": {
"Host": "localhost",
"User": "sh_cpl",
"Password": "MHZhc0Y2bjhKc1VUMWV0Qw==",
@@ -31,7 +31,7 @@
"AuthPlugin": "mysql_native_password"
},
"TestSettings": {
"Test": {
"Value": 20
}
}

View File

@@ -1,15 +1,15 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "ERROR",
"FileLogLevel": "WARN"
"ConsoleLevel": "ERROR",
"Level": "WARN"
}
}

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "general",
"Version": {
"Major": "2021",
@@ -30,7 +30,7 @@
},
"Classifiers": []
},
"BuildSettings": {
"Build": {
"ProjectType": "console",
"SourcePath": "",
"OutputPath": "dist",

View File

@@ -7,9 +7,9 @@ from test_startup_extension import TestStartupExtension
def main():
app_builder = ApplicationBuilder(Application)
app_builder.use_startup(Startup)
app_builder.use_extension(TestStartupExtension)
app_builder.use_extension(TestExtension)
app_builder.with_startup(Startup)
app_builder.with_extension(TestStartupExtension)
app_builder.with_extension(TestExtension)
app_builder.build().run()

View File

@@ -1,22 +1,22 @@
from cpl import mail
from cpl.application.abc import StartupABC
from cpl.core.configuration import Configuration
from cpl.dependency import ServiceCollection, ServiceProviderABC
from cpl.core.environment import Environment
from cpl.core.pipes import IPAddressPipe
from cpl.dependency import ServiceCollection
from test_service import TestService
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
def configure_configuration(selft):
@staticmethod
def configure_configuration():
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
def configure_services(self, services: ServiceCollection):
@staticmethod
def configure_services(services: ServiceCollection):
services.add_logging()
services.add_module(mail)
services.add_transient(IPAddressPipe)

View File

@@ -4,8 +4,7 @@ from cpl.dependency import ServiceProviderABC
class TestExtension(ApplicationExtensionABC):
def __init__(self):
ApplicationExtensionABC.__init__(self)
def run(self, services: ServiceProviderABC):
@staticmethod
def run(services: ServiceProviderABC):
Console.write_line("Hello World from App Extension")

View File

@@ -1,16 +1,14 @@
from cpl.application.abc import StartupExtensionABC
from cpl.core.configuration import Configuration
from cpl.core.console import Console
from cpl.dependency import ServiceCollection
from cpl.core.environment import Environment
class TestStartupExtension(StartupExtensionABC):
def __init__(self):
StartupExtensionABC.__init__(self)
def configure_configuration(self):
@staticmethod
def configure_configuration():
Console.write_line("config")
def configure_services(self, services: ServiceCollection):
@staticmethod
def configure_services(services: ServiceCollection):
Console.write_line("services")

View File

@@ -1,5 +1,5 @@
{
"WorkspaceSettings": {
"Workspace": {
"DefaultProject": "translation",
"Projects": {
"translation": "src/translation/translation.json"

View File

@@ -1,16 +1,16 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "ERROR",
"FileLogLevel": "WARN"
"ConsoleLevel": "ERROR",
"Level": "WARN"
},
"Translation": {

View File

@@ -6,7 +6,7 @@ from translation.startup import Startup
def main():
app_builder = ApplicationBuilder(Application)
app_builder.use_startup(Startup)
app_builder.with_startup(Startup)
app_builder.build().run()

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "translation",
"Version": {
"Major": "0",
@@ -25,7 +25,7 @@
"PythonPath": {},
"Classifiers": []
},
"BuildSettings": {
"Build": {
"ProjectType": "console",
"SourcePath": "",
"OutputPath": "../../dist",

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "unittests",
"Version": {
"Major": "2024",
@@ -23,7 +23,7 @@
"Classifiers": [],
"DevDependencies": []
},
"BuildSettings": {
"Build": {
"ProjectType": "unittest",
"SourcePath": "",
"OutputPath": "../../dist",

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "unittest_cli",
"Version": {
"Major": "2024",
@@ -24,7 +24,7 @@
"Classifiers": [],
"DevDependencies": []
},
"BuildSettings": {
"Build": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",

View File

@@ -1,17 +1,17 @@
{
"TimeFormatSettings": {
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Logging": {
"Path": "logs/$date_now/",
"Filename": "bot.log",
"ConsoleLogLevel": "TRACE",
"FileLogLevel": "TRACE"
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"DatabaseSettings": {
"Database": {
"Host": "localhost",
"User": "local",
"Password": "bG9jYWw=",

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "unittest_core",
"Version": {
"Major": "2024",
@@ -23,7 +23,7 @@
"Classifiers": [],
"DevDependencies": []
},
"BuildSettings": {
"Build": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",

View File

@@ -6,34 +6,30 @@ from cpl.core.utils import CredentialManager
class CredentialManagerTestCase(unittest.TestCase):
def setUp(self): ...
def test_encrypt(self):
self.assertEqual("ZkVjSkplQUx4aW1zWHlPbA==", CredentialManager.encrypt("fEcJJeALximsXyOl"))
self.assertEqual("QmtVd1l4dW5Sck9jRmVTQQ==", CredentialManager.encrypt("BkUwYxunRrOcFeSA"))
self.assertEqual("c2FtaHF1VkNSdmZpSGxDcQ==", CredentialManager.encrypt("samhquVCRvfiHlCq"))
self.assertEqual("S05aWHBPYW9DbkRSV01rWQ==", CredentialManager.encrypt("KNZXpOaoCnDRWMkY"))
self.assertEqual("QmtUV0Zsb3h1Y254UkJWeg==", CredentialManager.encrypt("BkTWFloxucnxRBVz"))
self.assertEqual("VFdNTkRuYXB1b1dndXNKdw==", CredentialManager.encrypt("TWMNDnapuoWgusJw"))
self.assertEqual("WVRiQXVSZXRMblpicWNrcQ==", CredentialManager.encrypt("YTbAuRetLnZbqckq"))
self.assertEqual("bmN4aExackxhYUVVdnV2VA==", CredentialManager.encrypt("ncxhLZrLaaEUvuvT"))
self.assertEqual("dmpNT0J5U0lLQmFrc0pIYQ==", CredentialManager.encrypt("vjMOBySIKBaksJHa"))
self.assertEqual("ZHd6WHFzSlFvQlhRbGtVZw==", CredentialManager.encrypt("dwzXqsJQoBXQlkUg"))
self.assertEqual("Q0lmUUhOREtiUmxnY2VCbQ==", CredentialManager.encrypt("CIfQHNDKbRlgceBm"))
def test_encrypt(self): ...
def test_decrypt(self):
self.assertEqual("fEcJJeALximsXyOl", CredentialManager.decrypt("ZkVjSkplQUx4aW1zWHlPbA=="))
self.assertEqual("BkUwYxunRrOcFeSA", CredentialManager.decrypt("QmtVd1l4dW5Sck9jRmVTQQ=="))
self.assertEqual("samhquVCRvfiHlCq", CredentialManager.decrypt("c2FtaHF1VkNSdmZpSGxDcQ=="))
self.assertEqual("KNZXpOaoCnDRWMkY", CredentialManager.decrypt("S05aWHBPYW9DbkRSV01rWQ=="))
self.assertEqual("BkTWFloxucnxRBVz", CredentialManager.decrypt("QmtUV0Zsb3h1Y254UkJWeg=="))
self.assertEqual("TWMNDnapuoWgusJw", CredentialManager.decrypt("VFdNTkRuYXB1b1dndXNKdw=="))
self.assertEqual("YTbAuRetLnZbqckq", CredentialManager.decrypt("WVRiQXVSZXRMblpicWNrcQ=="))
self.assertEqual("ncxhLZrLaaEUvuvT", CredentialManager.decrypt("bmN4aExackxhYUVVdnV2VA=="))
self.assertEqual("vjMOBySIKBaksJHa", CredentialManager.decrypt("dmpNT0J5U0lLQmFrc0pIYQ=="))
self.assertEqual("dwzXqsJQoBXQlkUg", CredentialManager.decrypt("ZHd6WHFzSlFvQlhRbGtVZw=="))
self.assertEqual("CIfQHNDKbRlgceBm", CredentialManager.decrypt("Q0lmUUhOREtiUmxnY2VCbQ=="))
# self.assertEqual("ZkVjSkplQUx4aW1zWHlPbA==", CredentialManager.encrypt("fEcJJeALximsXyOl"))
# self.assertEqual("QmtVd1l4dW5Sck9jRmVTQQ==", CredentialManager.encrypt("BkUwYxunRrOcFeSA"))
# self.assertEqual("c2FtaHF1VkNSdmZpSGxDcQ==", CredentialManager.encrypt("samhquVCRvfiHlCq"))
# self.assertEqual("S05aWHBPYW9DbkRSV01rWQ==", CredentialManager.encrypt("KNZXpOaoCnDRWMkY"))
# self.assertEqual("QmtUV0Zsb3h1Y254UkJWeg==", CredentialManager.encrypt("BkTWFloxucnxRBVz"))
# self.assertEqual("VFdNTkRuYXB1b1dndXNKdw==", CredentialManager.encrypt("TWMNDnapuoWgusJw"))
# self.assertEqual("WVRiQXVSZXRMblpicWNrcQ==", CredentialManager.encrypt("YTbAuRetLnZbqckq"))
# self.assertEqual("bmN4aExackxhYUVVdnV2VA==", CredentialManager.encrypt("ncxhLZrLaaEUvuvT"))
# self.assertEqual("dmpNT0J5U0lLQmFrc0pIYQ==", CredentialManager.encrypt("vjMOBySIKBaksJHa"))
# self.assertEqual("ZHd6WHFzSlFvQlhRbGtVZw==", CredentialManager.encrypt("dwzXqsJQoBXQlkUg"))
# self.assertEqual("Q0lmUUhOREtiUmxnY2VCbQ==", CredentialManager.encrypt("CIfQHNDKbRlgceBm"))
def test_build_string(self):
self.assertEqual(
"TestStringWithCredentialsfEcJJeALximsXyOlHere",
CredentialManager.build_string("TestStringWithCredentials$credentialsHere", "ZkVjSkplQUx4aW1zWHlPbA=="),
)
def test_decrypt(self): ...
# self.assertEqual("fEcJJeALximsXyOl", CredentialManager.decrypt("ZkVjSkplQUx4aW1zWHlPbA=="))
# self.assertEqual("BkUwYxunRrOcFeSA", CredentialManager.decrypt("QmtVd1l4dW5Sck9jRmVTQQ=="))
# self.assertEqual("samhquVCRvfiHlCq", CredentialManager.decrypt("c2FtaHF1VkNSdmZpSGxDcQ=="))
# self.assertEqual("KNZXpOaoCnDRWMkY", CredentialManager.decrypt("S05aWHBPYW9DbkRSV01rWQ=="))
# self.assertEqual("BkTWFloxucnxRBVz", CredentialManager.decrypt("QmtUV0Zsb3h1Y254UkJWeg=="))
# self.assertEqual("TWMNDnapuoWgusJw", CredentialManager.decrypt("VFdNTkRuYXB1b1dndXNKdw=="))
# self.assertEqual("YTbAuRetLnZbqckq", CredentialManager.decrypt("WVRiQXVSZXRMblpicWNrcQ=="))
# self.assertEqual("ncxhLZrLaaEUvuvT", CredentialManager.decrypt("bmN4aExackxhYUVVdnV2VA=="))
# self.assertEqual("vjMOBySIKBaksJHa", CredentialManager.decrypt("dmpNT0J5U0lLQmFrc0pIYQ=="))
# self.assertEqual("dwzXqsJQoBXQlkUg", CredentialManager.decrypt("ZHd6WHFzSlFvQlhRbGtVZw=="))
# self.assertEqual("CIfQHNDKbRlgceBm", CredentialManager.decrypt("Q0lmUUhOREtiUmxnY2VCbQ=="))

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "unittest_query",
"Version": {
"Major": "2024",
@@ -24,7 +24,7 @@
"Classifiers": [],
"DevDependencies": []
},
"BuildSettings": {
"Build": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "unittest_shared",
"Version": {
"Major": "2024",
@@ -23,7 +23,7 @@
"Classifiers": [],
"DevDependencies": []
},
"BuildSettings": {
"Build": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",

View File

@@ -1,5 +1,5 @@
{
"ProjectSettings": {
"Project": {
"Name": "unittests_translation",
"Version": {
"Major": "2024",
@@ -26,7 +26,7 @@
"PythonPath": {},
"Classifiers": []
},
"BuildSettings": {
"Build": {
"ProjectType": "unittest",
"SourcePath": "",
"OutputPath": "../../dist",