Compare commits

..

15 Commits

Author SHA1 Message Date
1a67318091 Config model options handling. Closes #185
All checks were successful
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 19s
Build on push / dependency (push) Successful in 25s
Build on push / translation (push) Successful in 17s
Build on push / database (push) Successful in 20s
Build on push / application (push) Successful in 21s
Build on push / mail (push) Successful in 20s
Build on push / auth (push) Successful in 14s
2025-09-19 17:47:49 +02:00
2be58f6577 Introduced fernet to credential manager. Closes #183
All checks were successful
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 22s
Build on push / dependency (push) Successful in 15s
Build on push / application (push) Successful in 20s
Build on push / database (push) Successful in 21s
Build on push / translation (push) Successful in 21s
Build on push / mail (push) Successful in 22s
Build on push / auth (push) Successful in 18s
2025-09-19 15:01:16 +02:00
9c6078f4fd with_logging & logger level fix
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 18s
Build on push / dependency (push) Successful in 14s
Build on push / translation (push) Successful in 15s
Build on push / database (push) Successful in 17s
Build on push / application (push) Successful in 18s
Build on push / mail (push) Successful in 20s
Build on push / auth (push) Successful in 18s
2025-09-17 22:18:38 +02:00
dfdc31512d App with extension functions
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 17s
Build on push / query (push) Successful in 17s
Build on push / dependency (push) Successful in 14s
Build on push / translation (push) Successful in 14s
Build on push / database (push) Successful in 18s
Build on push / mail (push) Successful in 19s
Build on push / application (push) Successful in 22s
Build on push / auth (push) Successful in 14s
2025-09-17 21:56:47 +02:00
ab7ff7da93 Made startup/app extensions static 2025-09-17 20:54:21 +02:00
41087a838b Removed pass from empty functions
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 17s
Build on push / query (push) Successful in 17s
Build on push / dependency (push) Successful in 17s
Build on push / translation (push) Successful in 14s
Build on push / mail (push) Successful in 18s
Build on push / database (push) Successful in 18s
Build on push / application (push) Successful in 24s
Build on push / auth (push) Successful in 14s
2025-09-17 20:49:15 +02:00
836b92ccbf Further console test 2025-09-17 20:44:25 +02:00
8aaba22940 Improved application structure
All checks were successful
Build on push / prepare (push) Successful in 11s
Build on push / core (push) Successful in 22s
Build on push / query (push) Successful in 22s
Build on push / dependency (push) Successful in 20s
Build on push / database (push) Successful in 20s
Build on push / translation (push) Successful in 20s
Build on push / application (push) Successful in 22s
Build on push / mail (push) Successful in 23s
Build on push / auth (push) Successful in 16s
2025-09-17 19:23:14 +02:00
504dc5e188 Added auth & improved database
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Successful in 18s
Build on push / core (push) Successful in 24s
Build on push / dependency (push) Successful in 17s
Build on push / database (push) Successful in 15s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 18s
Build on push / application (push) Successful in 19s
Build on push / auth (push) Successful in 16s
2025-09-17 12:21:32 +02:00
4625b626e6 Added dao base
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 16s
Build on push / core (push) Successful in 23s
Build on push / translation (push) Successful in 14s
Build on push / mail (push) Successful in 14s
2025-09-17 00:12:25 +02:00
58dbd3ed1e Cleanup for mysql
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 18s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 17s
2025-09-16 20:21:33 +02:00
cd7dfaf2b4 Fixed spinner thread & remove log thread
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 19s
Build on push / translation (push) Successful in 18s
Build on push / mail (push) Successful in 18s
2025-09-16 18:41:57 +02:00
8ad3e3bdb4 Removed ConfigModel from_dict
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 16s
Build on push / query (push) Successful in 17s
Build on push / mail (push) Successful in 17s
Build on push / translation (push) Successful in 18s
2025-09-16 08:51:56 +02:00
b97bc0a3ed Restructuring
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 17s
Build on push / core (push) Successful in 26s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 17s
2025-09-16 08:48:07 +02:00
5f25400bcd Updated config & environment 2025-09-16 08:39:00 +02:00
324 changed files with 5217 additions and 4064 deletions

View File

@@ -12,6 +12,20 @@ jobs:
version_suffix: 'dev' version_suffix: 'dev'
secrets: inherit secrets: inherit
application:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-application
secrets: inherit
auth:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency, database ]
with:
working_directory: src/cpl-auth
secrets: inherit
core: core:
uses: ./.gitea/workflows/package.yaml uses: ./.gitea/workflows/package.yaml
needs: [prepare] needs: [prepare]
@@ -19,6 +33,27 @@ jobs:
working_directory: src/cpl-core working_directory: src/cpl-core
secrets: inherit secrets: inherit
database:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-database
secrets: inherit
dependency:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-dependency
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-mail
secrets: inherit
query: query:
uses: ./.gitea/workflows/package.yaml uses: ./.gitea/workflows/package.yaml
needs: [prepare] needs: [prepare]
@@ -28,14 +63,7 @@ jobs:
translation: translation:
uses: ./.gitea/workflows/package.yaml uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ] needs: [ prepare, core, dependency ]
with: with:
working_directory: src/cpl-translation working_directory: src/cpl-translation
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-mail
secrets: inherit secrets: inherit

1
.gitignore vendored
View File

@@ -113,6 +113,7 @@ venv.bak/
# Custom Environments # Custom Environments
cpl-env/ cpl-env/
.secret
# Spyder project settings # Spyder project settings
.spyderproject .spyderproject

153
README.md
View File

@@ -1,153 +0,0 @@
<h1 align="center">CPL - Common python library</h1>
<!-- Summary -->
<p align="center">
<!-- <img src="" alt="cpl-logo" width="120px" height="120px"/> -->
<br>
<i>
CPL is a development platform for python server applications
<br>using Python.</i>
<br>
</p>
## Table of Contents
<!-- TABLE OF CONTENTS -->
<ol>
<li><a href="#Features">Features</a></li>
<li>
<a href="#getting-started">Getting Started</a>
<ul>
<li><a href="#prerequisites">Prerequisites</a></li>
<li><a href="#installation">Installation</a></li>
</ul>
</li>
<li><a href="#roadmap">Roadmap</a></li>
<li><a href="#contributing">Contributing</a></li>
<li><a href="#license">License</a></li>
<li><a href="#contact">Contact</a></li>
</ol>
## Features
<!-- FEATURE OVERVIEW -->
- Expandle
- Application base
- Standardized application classes
- Application object builder
- Application extension classes
- Startup classes
- Startup extension classes
- Configuration
- Configure via object mapped JSON
- Console argument handling
- Console class for in and output
- Banner
- Spinner
- Options (menu)
- Table
- Write
- Write_at
- Write_line
- Write_line_at
- Dependency injection
- Service lifetimes: singleton, scoped and transient
- Providing of application environment
- Environment (development, staging, testing, production)
- Appname
- Customer
- Hostname
- Runtime directory
- Working directory
- Logging
- Standardized logger
- Log-level (FATAL, ERROR, WARN, INFO, DEBUG & TRACE)
- Mail handling
- Send mails
- Pipe classes
- Convert input
- Utils
- Credential manager
- Encryption via BASE64
- PIP wrapper class based on subprocess
- Run pip commands
- String converter to different variants
- to_lower_case
- to_camel_case
- ...
<!-- GETTING STARTED -->
## Getting Started
[Get started with CPL][quickstart].
### Prerequisites
- Install [python] which includes [Pip installs packages][pip]
### Installation
Install the CPL package
```sh
pip install cpl-core --extra-index-url https://pip.sh-edraft.de
```
Install the CPL CLI
```sh
pip install cpl-cli --extra-index-url https://pip.sh-edraft.de
```
Create workspace:
```sh
cpl new <console|library|unittest> <PROJECT NAME>
```
Run the application:
```sh
cd <PROJECT NAME>
cpl start
```
<!-- ROADMAP -->
## Roadmap
See the [open issues](https://git.sh-edraft.de/sh-edraft.de/sh_cpl/issues) for a list of proposed features (and known issues).
<!-- CONTRIBUTING -->
## Contributing
### Contributing Guidelines
Read through our [contributing guidelines][contributing] to learn about our submission process, coding rules and more.
### Want to Help?
Want to file a bug, contribute some code, or improve documentation? Excellent! Read up on our guidelines for [contributing][contributing].
<!-- LICENSE -->
## License
Distributed under the MIT License. See [LICENSE] for more information.
<!-- CONTACT -->
## Contact
Sven Heidemann - sven.heidemann@sh-edraft.de
Project link: [https://git.sh-edraft.de/sh-edraft.de/sh_common_py_lib](https://git.sh-edraft.de/sh-edraft.de/sh_cpl)
<!-- External LINKS -->
[pip_url]: https://pip.sh-edraft.de
[python]: https://www.python.org/
[pip]: https://pypi.org/project/pip/
<!-- Internal LINKS -->
[project]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl
[quickstart]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/quickstart
[contributing]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/contributing
[license]: LICENSE

View File

@@ -0,0 +1 @@
from .application_builder import ApplicationBuilder

View File

@@ -1,6 +1,4 @@
from .application_abc import ApplicationABC from .application_abc import ApplicationABC
from .application_builder import ApplicationBuilder
from .application_builder_abc import ApplicationBuilderABC
from .application_extension_abc import ApplicationExtensionABC from .application_extension_abc import ApplicationExtensionABC
from .startup_abc import StartupABC from .startup_abc import StartupABC
from .startup_extension_abc import StartupExtensionABC from .startup_extension_abc import StartupExtensionABC

View File

@@ -0,0 +1,86 @@
from abc import ABC, abstractmethod
from typing import Callable, Self
from cpl.application.host import Host
from cpl.core.console.console import Console
from cpl.core.log import LogSettings
from cpl.core.log.log_level_enum import LogLevel
from cpl.core.log.logger_abc import LoggerABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
def __not_implemented__(package: str, func: Callable):
raise NotImplementedError(f"Package {package} is required to use {func.__name__} method")
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
services: :class:`cpl.dependency.service_provider_abc.ServiceProviderABC`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(self, services: ServiceProviderABC):
self._services = services
@classmethod
def extend(cls, name: str | Callable, func: Callable[[Self], Self]):
r"""Extend the Application with a custom method
Parameters:
name: :class:`str`
Name of the method
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
if callable(name):
name = name.__name__
setattr(cls, name, func)
return cls
def with_logging(self, level: LogLevel = None):
if level is None:
from cpl.core.configuration.configuration import Configuration
settings = Configuration.get(LogSettings)
level = settings.level if settings else LogLevel.info
logger = self._services.get_service(LoggerABC)
logger.set_level(level)
def with_permissions(self, *args, **kwargs):
__not_implemented__("cpl-auth", self.with_permissions)
def with_migrations(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_migrations)
def with_seeders(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_seeders)
def with_extension(self, func: Callable[[Self, ...], None], *args, **kwargs):
r"""Extend the Application with a custom method
Parameters:
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
assert func is not None, "func must not be None"
assert callable(func), "func must be callable"
func(self, *args, **kwargs)
def run(self):
r"""Entry point
Called by custom Application.main
"""
try:
Host.run(self.main)
except KeyboardInterrupt:
Console.close()
@abstractmethod
def main(self): ...

View File

@@ -0,0 +1,10 @@
from abc import ABC, abstractmethod
from cpl.dependency import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@staticmethod
@abstractmethod
def run(services: ServiceProviderABC): ...

View File

@@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
from cpl.dependency.service_collection import ServiceCollection
class StartupABC(ABC):
r"""ABC for the startup class"""
@staticmethod
@abstractmethod
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
def configure_services(service: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
from cpl.dependency import ServiceCollection
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@staticmethod
@abstractmethod
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
def configure_services(services: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -0,0 +1,64 @@
import asyncio
from typing import Type, Optional
from cpl.application.abc.application_abc import ApplicationABC
from cpl.application.abc.application_extension_abc import ApplicationExtensionABC
from cpl.application.abc.startup_abc import StartupABC
from cpl.application.abc.startup_extension_abc import StartupExtensionABC
from cpl.application.host import Host
from cpl.dependency.service_collection import ServiceCollection
class ApplicationBuilder:
r"""A builder for constructing an application with configurable services and extensions."""
def __init__(self, app: Type[ApplicationABC]):
assert app is not None, "app must not be None"
assert issubclass(app, ApplicationABC), "app must be an subclass of ApplicationABC or its subclass"
self._app = app if app is not None else ApplicationABC
self._services = ServiceCollection()
self._startup: Optional[StartupABC] = None
self._app_extensions: list[Type[ApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC]] = []
self._async_loop = asyncio.get_event_loop()
@property
def services(self) -> ServiceCollection:
return self._services
@property
def service_provider(self):
return self._services.build()
def with_startup(self, startup: Type[StartupABC]) -> "ApplicationBuilder":
self._startup = startup
return self
def with_extension(
self,
extension: Type[ApplicationExtensionABC | StartupExtensionABC],
) -> "ApplicationBuilder":
if (issubclass(extension, ApplicationExtensionABC)) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (issubclass(extension, StartupExtensionABC)) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def build(self) -> ApplicationABC:
for extension in self._startup_extensions:
Host.run(extension.configure_configuration)
Host.run(extension.configure_services, self._services)
if self._startup is not None:
Host.run(self._startup.configure_configuration)
Host.run(self._startup.configure_services, self._services)
for extension in self._app_extensions:
Host.run(extension.run, self.service_provider)
return self._app(self.service_provider)

View File

@@ -0,0 +1,17 @@
import asyncio
from typing import Callable
class Host:
_loop = asyncio.get_event_loop()
@classmethod
def get_loop(cls):
return cls._loop
@classmethod
def run(cls, func: Callable, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return cls._loop.run_until_complete(func(*args, **kwargs))
return func(*args, **kwargs)

View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-application"
version = "2024.7.0"
description = "CPL application"
readme ="CPL application package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "application", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,2 @@
cpl-core
cpl-dependency

View File

@@ -0,0 +1,82 @@
from enum import Enum
from typing import Type
from cpl.application.abc import ApplicationABC as _ApplicationABC
from cpl.auth import permission as _permission
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin as _KeycloakAdmin
from cpl.auth.keycloak.keycloak_client import KeycloakClient as _KeycloakClient
from cpl.dependency.service_collection import ServiceCollection as _ServiceCollection
from .auth_logger import AuthLogger
from .keycloak_settings import KeycloakSettings
from .permission_seeder import PermissionSeeder
def _with_permissions(self: _ApplicationABC, *permissions: Type[Enum]) -> _ApplicationABC:
from cpl.auth.permission.permissions_registry import PermissionsRegistry
for perm in permissions:
PermissionsRegistry.with_enum(perm)
return self
def _add_daos(collection: _ServiceCollection):
from .schema._administration.auth_user_dao import AuthUserDao
from .schema._administration.api_key_dao import ApiKeyDao
from .schema._permission.api_key_permission_dao import ApiKeyPermissionDao
from .schema._permission.permission_dao import PermissionDao
from .schema._permission.role_dao import RoleDao
from .schema._permission.role_permission_dao import RolePermissionDao
from .schema._permission.role_user_dao import RoleUserDao
collection.add_singleton(AuthUserDao)
collection.add_singleton(ApiKeyDao)
collection.add_singleton(ApiKeyPermissionDao)
collection.add_singleton(PermissionDao)
collection.add_singleton(RoleDao)
collection.add_singleton(RolePermissionDao)
collection.add_singleton(RoleUserDao)
def add_auth(collection: _ServiceCollection):
import os
from cpl.core.console import Console
from cpl.database.service.migration_service import MigrationService
from cpl.database.model.server_type import ServerType, ServerTypes
try:
collection.add_singleton(_KeycloakClient)
collection.add_singleton(_KeycloakAdmin)
_add_daos(collection)
provider = collection.build()
migration_service: MigrationService = provider.get_service(MigrationService)
if ServerType.server_type == ServerTypes.POSTGRES:
migration_service.with_directory(
os.path.join(os.path.dirname(os.path.realpath(__file__)), "scripts/postgres")
)
elif ServerType.server_type == ServerTypes.MYSQL:
migration_service.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "scripts/mysql"))
except ImportError as e:
Console.error("cpl-auth is not installed", str(e))
def add_permission(collection: _ServiceCollection):
from cpl.auth.permission_seeder import PermissionSeeder
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.auth.permission.permissions import Permissions
try:
collection.add_singleton(DataSeederABC, PermissionSeeder)
PermissionsRegistry.with_enum(Permissions)
except ImportError as e:
from cpl.core.console import Console
Console.error("cpl-auth is not installed", str(e))
_ServiceCollection.with_module(add_auth, __name__)
_ServiceCollection.with_module(add_permission, _permission.__name__)
_ApplicationABC.extend(_ApplicationABC.with_permissions, _with_permissions)

View File

@@ -0,0 +1,8 @@
from cpl.core.log import Logger
from cpl.core.typing import Source
class AuthLogger(Logger):
def __init__(self, source: Source):
Logger.__init__(self, source, "auth")

View File

@@ -0,0 +1,3 @@
from .keycloak_admin import KeycloakAdmin
from .keycloak_client import KeycloakClient
from .keycloak_user import KeycloakUser

View File

@@ -0,0 +1,24 @@
from keycloak import KeycloakAdmin as _KeycloakAdmin, KeycloakOpenIDConnection
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.keycloak_settings import KeycloakSettings
_logger = AuthLogger("keycloak")
class KeycloakAdmin(_KeycloakAdmin):
def __init__(self, settings: KeycloakSettings):
_logger.info("Initializing Keycloak admin")
_connection = KeycloakOpenIDConnection(
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
_KeycloakAdmin.__init__(
self,
connection=_connection,
)
self.__connection = _connection

View File

@@ -0,0 +1,26 @@
from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.keycloak_settings import KeycloakSettings
_logger = AuthLogger("keycloak")
class KeycloakClient(KeycloakOpenID):
def __init__(self, settings: KeycloakSettings):
KeycloakOpenID.__init__(
self,
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
_logger.info("Initializing Keycloak client")
connection = KeycloakOpenIDConnection(
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
self._admin = KeycloakAdmin(connection=connection)

View File

@@ -0,0 +1,36 @@
from cpl.core.utils.get_value import get_value
from cpl.dependency import ServiceProviderABC
class KeycloakUser:
def __init__(self, source: dict):
self._username = get_value(source, "preferred_username", str)
self._email = get_value(source, "email", str)
self._email_verified = get_value(source, "email_verified", bool)
self._name = get_value(source, "name", str)
@property
def username(self) -> str:
return self._username
@property
def email(self) -> str:
return self._email
@property
def email_verified(self) -> bool:
return self._email_verified
@property
def name(self) -> str:
return self._name
# Attrs from keycloak
@property
def id(self) -> str:
from cpl.auth import KeycloakAdmin
keycloak_admin: KeycloakAdmin = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak_admin.get_user_id(self._username)

View File

@@ -0,0 +1,17 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
class KeycloakSettings(ConfigurationModelABC):
def __init__(
self,
src: Optional[dict] = None,
):
ConfigurationModelABC.__init__(self, src, "KEYCLOAK")
self.option("url", str, required=True)
self.option("client_id", str, required=True)
self.option("realm", str, required=True)
self.option("client_secret", str, required=True)

View File

@@ -0,0 +1,36 @@
from enum import Enum
class Permissions(Enum):
""" """
"""
Administration
"""
# administrator
administrator = "administrator"
# api keys
api_keys = "api_keys"
api_keys_create = "api_keys.create"
api_keys_update = "api_keys.update"
api_keys_delete = "api_keys.delete"
# users
users = "users"
users_create = "users.create"
users_update = "users.update"
users_delete = "users.delete"
# settings
settings = "settings"
settings_update = "settings.update"
"""
Permissions
"""
# roles
roles = "roles"
roles_create = "roles.create"
roles_update = "roles.update"
roles_delete = "roles.delete"

View File

@@ -0,0 +1,24 @@
from enum import Enum
from typing import Type
class PermissionsRegistry:
_permissions: dict[str, str] = {}
@classmethod
def get(cls):
return cls._permissions.keys()
@classmethod
def descriptions(cls):
return {x: cls._permissions[x] for x in cls._permissions if cls._permissions[x] is not None}
@classmethod
def set(cls, permission: str, description: str = None):
cls._permissions[permission] = description
@classmethod
def with_enum(cls, e: Type[Enum]):
perms = [x.value for x in e]
for perm in perms:
cls.set(str(perm))

View File

@@ -0,0 +1,120 @@
from cpl.auth.permission.permissions import Permissions
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.auth.schema import (
Permission,
Role,
RolePermission,
ApiKey,
ApiKeyPermission,
PermissionDao,
RoleDao,
RolePermissionDao,
ApiKeyDao,
ApiKeyPermissionDao,
)
from cpl.core.utils.get_value import get_value
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class PermissionSeeder(DataSeederABC):
def __init__(
self,
permission_dao: PermissionDao,
role_dao: RoleDao,
role_permission_dao: RolePermissionDao,
api_key_dao: ApiKeyDao,
api_key_permission_dao: ApiKeyPermissionDao,
):
DataSeederABC.__init__(self)
self._permission_dao = permission_dao
self._role_dao = role_dao
self._role_permission_dao = role_permission_dao
self._api_key_dao = api_key_dao
self._api_key_permission_dao = api_key_permission_dao
async def seed(self):
permissions = await self._permission_dao.get_all()
possible_permissions = [permission for permission in PermissionsRegistry.get()]
if len(permissions) == len(possible_permissions):
_logger.info("Permissions already existing")
await self._update_missing_descriptions()
return
to_delete = []
for permission in permissions:
if permission.name in possible_permissions:
continue
to_delete.append(permission)
await self._permission_dao.delete_many(to_delete, hard_delete=True)
_logger.warning("Permissions incomplete")
permission_names = [permission.name for permission in permissions]
await self._permission_dao.create_many(
[
Permission(
0,
permission,
get_value(PermissionsRegistry.descriptions(), permission, str),
)
for permission in possible_permissions
if permission not in permission_names
]
)
await self._update_missing_descriptions()
await self._add_missing_to_role()
await self._add_missing_to_api_key()
async def _add_missing_to_role(self):
admin_role = await self._role_dao.find_single_by([{Role.id: 1}, {Role.name: "admin"}])
if admin_role is None:
return
admin_permissions = await self._role_permission_dao.get_by_role_id(admin_role.id, with_deleted=True)
to_assign = [
RolePermission(0, admin_role.id, permission.id)
for permission in await self._permission_dao.get_all()
if permission.id not in [x.permission_id for x in admin_permissions]
]
await self._role_permission_dao.create_many(to_assign)
async def _add_missing_to_api_key(self):
admin_api_key = await self._api_key_dao.find_single_by([{ApiKey.id: 1}, {ApiKey.identifier: "admin"}])
if admin_api_key is None:
return
admin_permissions = await self._api_key_permission_dao.find_by_api_key_id(admin_api_key.id, with_deleted=True)
to_assign = [
ApiKeyPermission(0, admin_api_key.id, permission.id)
for permission in await self._permission_dao.get_all()
if permission.id not in [x.permission_id for x in admin_permissions]
]
await self._api_key_permission_dao.create_many(to_assign)
async def _update_missing_descriptions(self):
permissions = {
permission.name: permission
for permission in await self._permission_dao.find_by([{Permission.description: None}])
}
to_update = []
if len(permissions) == 0:
return
for key in PermissionsRegistry.descriptions():
if key.value not in permissions:
continue
permissions[key.value].description = PermissionsRegistry.descriptions()[key]
to_update.append(permissions[key.value])
if len(to_update) == 0:
return
await self._permission_dao.update_many(to_update)

View File

@@ -0,0 +1,15 @@
from ._administration.api_key import ApiKey
from ._administration.api_key_dao import ApiKeyDao
from ._administration.auth_user import AuthUser
from ._administration.auth_user_dao import AuthUserDao
from ._permission.api_key_permission import ApiKeyPermission
from ._permission.api_key_permission_dao import ApiKeyPermissionDao
from ._permission.permission import Permission
from ._permission.permission_dao import PermissionDao
from ._permission.role import Role
from ._permission.role_dao import RoleDao
from ._permission.role_permission import RolePermission
from ._permission.role_permission_dao import RolePermissionDao
from ._permission.role_user import RoleUser
from ._permission.role_user_dao import RoleUserDao

View File

@@ -0,0 +1,66 @@
import secrets
from datetime import datetime
from typing import Optional, Union
from async_property import async_property
from cpl.auth.permission.permissions import Permissions
from cpl.core.environment.environment import Environment
from cpl.core.log.logger import Logger
from cpl.core.typing import Id, SerialId
from cpl.core.utils.credential_manager import CredentialManager
from cpl.database.abc.db_model_abc import DbModelABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
_logger = Logger(__name__)
class ApiKey(DbModelABC):
def __init__(
self,
id: SerialId,
identifier: str,
key: Union[str, bytes],
deleted: bool = False,
editor_id: Optional[Id] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._identifier = identifier
self._key = key
@property
def identifier(self) -> str:
return self._identifier
@property
def key(self) -> str:
return self._key
@property
def plain_key(self) -> str:
return CredentialManager.decrypt(self.key)
@async_property
async def permissions(self):
from cpl.auth.schema._permission.api_key_permission_dao import ApiKeyPermissionDao
apiKeyPermissionDao = ServiceProviderABC.get_global_provider().get_service(ApiKeyPermissionDao)
return [await x.permission for x in await apiKeyPermissionDao.find_by_api_key_id(self.id)]
async def has_permission(self, permission: Permissions) -> bool:
return permission.value in [x.name for x in await self.permissions]
def set_new_api_key(self):
self._key = self.new_key()
@staticmethod
def new_key() -> str:
return CredentialManager.encrypt(f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}")
@classmethod
def new(cls, identifier: str) -> "ApiKey":
return ApiKey(0, identifier, cls.new_key())

View File

@@ -0,0 +1,32 @@
from typing import Optional
from cpl.auth.schema._administration.api_key import ApiKey
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class ApiKeyDao(DbModelDaoABC[ApiKey]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, ApiKey, TableManager.get("api_keys"))
self.attribute(ApiKey.identifier, str)
self.attribute(ApiKey.key, str, "keystring")
async def get_by_identifier(self, ident: str) -> ApiKey:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Identifier = '{ident}'")
return self.to_object(result[0])
async def get_by_key(self, key: str) -> ApiKey:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Keystring = '{key}'")
return self.to_object(result[0])
async def find_by_key(self, key: str) -> Optional[ApiKey]:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Keystring = '{key}'")
if not result or len(result) == 0:
return None
return self.to_object(result[0])

View File

@@ -0,0 +1,89 @@
import uuid
from datetime import datetime
from typing import Optional
from async_property import async_property
from keycloak import KeycloakGetError
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
_logger = AuthLogger(__name__)
class AuthUser(DbModelABC):
def __init__(
self,
id: SerialId,
keycloak_id: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._keycloak_id = keycloak_id
@property
def keycloak_id(self) -> str:
return self._keycloak_id
@property
def username(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak_admin: KeycloakAdmin = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak_admin.get_user(self._keycloak_id).get("username")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
_logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@property
def email(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak_admin: KeycloakAdmin = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak_admin.get_user(self._keycloak_id).get("email")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
_logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@async_property
async def roles(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = ServiceProviderABC.get_global_service(RoleUserDao)
return [await x.role for x in await role_user_dao.get_by_user_id(self.id)]
@async_property
async def permissions(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.get_permissions(self.id)
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.has_permission(self.id, permission)
async def anonymize(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
self._keycloak_id = str(uuid.UUID(int=0))
await auth_user_dao.update(self)

View File

@@ -0,0 +1,72 @@
from typing import Optional, Union
from cpl.auth.permission.permissions import Permissions
from cpl.auth.schema._administration.auth_user import AuthUser
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
from cpl.database.external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.dependency import ServiceProviderABC
_logger = DBLogger(__name__)
class AuthUserDao(DbModelDaoABC[AuthUser]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, AuthUser, TableManager.get("auth_users"))
self.attribute(AuthUser.keycloak_id, str, aliases=["keycloakId"])
async def get_users():
return [(x.id, x.username, x.email) for x in await self.get_all()]
self.use_external_fields(
ExternalDataTempTableBuilder()
.with_table_name(self._table_name)
.with_field("id", "int", True)
.with_field("username", "text")
.with_field("email", "text")
.with_value_getter(get_users)
)
async def get_by_keycloak_id(self, keycloak_id: str) -> AuthUser:
return await self.get_single_by({AuthUser.keycloak_id: keycloak_id})
async def find_by_keycloak_id(self, keycloak_id: str) -> Optional[AuthUser]:
return await self.find_single_by({AuthUser.keycloak_id: keycloak_id})
async def has_permission(self, user_id: int, permission: Union[Permissions, str]) -> bool:
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
p = await permission_dao.get_by_name(permission if isinstance(permission, str) else permission.value)
result = await self._db.select_map(
f"""
SELECT COUNT(*)
FROM permission.role_users ru
JOIN permission.role_permissions rp ON ru.roleId = rp.roleId
WHERE ru.userId = {user_id}
AND rp.permissionId = {p.id}
AND ru.deleted = FALSE
AND rp.deleted = FALSE;
"""
)
if result is None or len(result) == 0:
return False
return result[0]["count"] > 0
async def get_permissions(self, user_id: int) -> list[Permissions]:
result = await self._db.select_map(
f"""
SELECT p.*
FROM permission.permissions p
JOIN permission.role_permissions rp ON p.id = rp.permissionId
JOIN permission.role_users ru ON rp.roleId = ru.roleId
WHERE ru.userId = {user_id}
AND rp.deleted = FALSE
AND ru.deleted = FALSE;
"""
)
return [Permissions(p["name"]) for p in result]

View File

@@ -0,0 +1,46 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import ServiceProviderABC
class ApiKeyPermission(DbJoinModelABC):
def __init__(
self,
id: SerialId,
api_key_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbJoinModelABC.__init__(self, api_key_id, permission_id, id, deleted, editor_id, created, updated)
self._api_key_id = api_key_id
self._permission_id = permission_id
@property
def api_key_id(self) -> int:
return self._api_key_id
@async_property
async def api_key(self):
from cpl.auth.schema._administration.api_key_dao import ApiKeyDao
api_key_dao: ApiKeyDao = ServiceProviderABC.get_global_service(ApiKeyDao)
return await api_key_dao.get_by_id(self._api_key_id)
@property
def permission_id(self) -> int:
return self._permission_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
return await permission_dao.get_by_id(self._permission_id)

View File

@@ -0,0 +1,29 @@
from cpl.auth.schema._permission.api_key_permission import ApiKeyPermission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class ApiKeyPermissionDao(DbModelDaoABC[ApiKeyPermission]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, ApiKeyPermission, TableManager.get("api_key_permissions"))
self.attribute(ApiKeyPermission.api_key_id, int)
self.attribute(ApiKeyPermission.permission_id, int)
async def find_by_api_key_id(self, api_key_id: int, with_deleted=False) -> list[ApiKeyPermission]:
f = [{ApiKeyPermission.api_key_id: api_key_id}]
if not with_deleted:
f.append({ApiKeyPermission.deleted: False})
return await self.find_by(f)
async def find_by_permission_id(self, permission_id: int, with_deleted=False) -> list[ApiKeyPermission]:
f = [{ApiKeyPermission.permission_id: permission_id}]
if not with_deleted:
f.append({ApiKeyPermission.deleted: False})
return await self.find_by(f)

View File

@@ -0,0 +1,37 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class Permission(DbModelABC):
def __init__(
self,
id: SerialId,
name: str,
description: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._description = description
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, value: str):
self._description = value

View File

@@ -0,0 +1,21 @@
from typing import Optional
from cpl.auth.schema._permission.permission import Permission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class PermissionDao(DbModelDaoABC[Permission]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, Permission, TableManager.get("permissions"))
self.attribute(Permission.name, str)
self.attribute(Permission.description, Optional[str])
async def get_by_name(self, name: str) -> Permission:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Name = '{name}'")
return self.to_object(result[0])

View File

@@ -0,0 +1,66 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
class Role(DbModelABC):
def __init__(
self,
id: SerialId,
name: str,
description: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._description = description
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, value: str):
self._description = value
@async_property
async def permissions(self):
from cpl.auth.schema._permission.role_permission_dao import RolePermissionDao
role_permission_dao: RolePermissionDao = ServiceProviderABC.get_global_service(RolePermissionDao)
return [await x.permission for x in await role_permission_dao.get_by_role_id(self.id)]
@async_property
async def users(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = ServiceProviderABC.get_global_service(RoleUserDao)
return [await x.user for x in await role_user_dao.get_by_role_id(self.id)]
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._permission.permission_dao import PermissionDao
from cpl.auth.schema._permission.role_permission_dao import RolePermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
role_permission_dao: RolePermissionDao = ServiceProviderABC.get_global_service(RolePermissionDao)
p = await permission_dao.get_by_name(permission.value)
return p.id in [x.id for x in await role_permission_dao.get_by_role_id(self.id)]

View File

@@ -0,0 +1,17 @@
from cpl.auth.schema._permission.role import Role
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class RoleDao(DbModelDaoABC[Role]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, Role, TableManager.get("roles"))
self.attribute(Role.name, str)
self.attribute(Role.description, str)
async def get_by_name(self, name: str) -> Role:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Name = '{name}'")
return self.to_object(result[0])

View File

@@ -0,0 +1,46 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
class RolePermission(DbModelABC):
def __init__(
self,
id: SerialId,
role_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._role_id = role_id
self._permission_id = permission_id
@property
def role_id(self) -> int:
return self._role_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = ServiceProviderABC.get_global_service(RoleDao)
return await role_dao.get_by_id(self._role_id)
@property
def permission_id(self) -> int:
return self._permission_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
return await permission_dao.get_by_id(self._permission_id)

View File

@@ -0,0 +1,29 @@
from cpl.auth.schema._permission.role_permission import RolePermission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class RolePermissionDao(DbModelDaoABC[RolePermission]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, RolePermission, TableManager.get("role_permissions"))
self.attribute(RolePermission.role_id, int)
self.attribute(RolePermission.permission_id, int)
async def get_by_role_id(self, role_id: int, with_deleted=False) -> list[RolePermission]:
f = [{RolePermission.role_id: role_id}]
if not with_deleted:
f.append({RolePermission.deleted: False})
return await self.find_by(f)
async def get_by_permission_id(self, permission_id: int, with_deleted=False) -> list[RolePermission]:
f = [{RolePermission.permission_id: permission_id}]
if not with_deleted:
f.append({RolePermission.deleted: False})
return await self.find_by(f)

View File

@@ -0,0 +1,46 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import ServiceProviderABC
class RoleUser(DbJoinModelABC):
def __init__(
self,
id: SerialId,
user_id: SerialId,
role_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbJoinModelABC.__init__(self, id, user_id, role_id, deleted, editor_id, created, updated)
self._user_id = user_id
self._role_id = role_id
@property
def user_id(self) -> int:
return self._user_id
@async_property
async def user(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.get_by_id(self._user_id)
@property
def role_id(self) -> int:
return self._role_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = ServiceProviderABC.get_global_service(RoleDao)
return await role_dao.get_by_id(self._role_id)

View File

@@ -0,0 +1,29 @@
from cpl.auth.schema._permission.role_user import RoleUser
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class RoleUserDao(DbModelDaoABC[RoleUser]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, RoleUser, TableManager.get("role_users"))
self.attribute(RoleUser.role_id, int)
self.attribute(RoleUser.user_id, int)
async def get_by_role_id(self, rid: int, with_deleted=False) -> list[RoleUser]:
f = [{RoleUser.role_id: rid}]
if not with_deleted:
f.append({RoleUser.deleted: False})
return await self.find_by(f)
async def get_by_user_id(self, uid: int, with_deleted=False) -> list[RoleUser]:
f = [{RoleUser.user_id: uid}]
if not with_deleted:
f.append({RoleUser.deleted: False})
return await self.find_by(f)

View File

@@ -0,0 +1,44 @@
CREATE TABLE IF NOT EXISTS administration_auth_users
(
id INT AUTO_INCREMENT PRIMARY KEY,
keycloakId CHAR(36) NOT NULL,
-- for history
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UC_KeycloakId UNIQUE (keycloakId),
CONSTRAINT FK_EditorId FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS administration_auth_users_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
keycloakId CHAR(36) NOT NULL,
-- for history
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_administration_auth_usersUpdate
AFTER UPDATE
ON administration_auth_users
FOR EACH ROW
BEGIN
INSERT INTO administration_auth_users_history
(id, keycloakId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.keycloakId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_administration_auth_usersDelete
AFTER DELETE
ON administration_auth_users
FOR EACH ROW
BEGIN
INSERT INTO administration_auth_users_history
(id, keycloakId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.keycloakId, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,46 @@
CREATE TABLE IF NOT EXISTS administration_api_keys
(
id INT AUTO_INCREMENT PRIMARY KEY,
identifier VARCHAR(255) NOT NULL,
keyString VARCHAR(255) NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UC_Identifier_Key UNIQUE (identifier, keyString),
CONSTRAINT UC_Key UNIQUE (keyString),
CONSTRAINT FK_ApiKeys_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS administration_api_keys_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
identifier VARCHAR(255) NOT NULL,
keyString VARCHAR(255) NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_ApiKeysUpdate
AFTER UPDATE
ON administration_api_keys
FOR EACH ROW
BEGIN
INSERT INTO administration_api_keys_history
(id, identifier, keyString, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.identifier, OLD.keyString, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_ApiKeysDelete
AFTER DELETE
ON administration_api_keys
FOR EACH ROW
BEGIN
INSERT INTO administration_api_keys_history
(id, identifier, keyString, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.identifier, OLD.keyString, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,179 @@
CREATE TABLE IF NOT EXISTS permission_permissions
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_PermissionName UNIQUE (name),
CONSTRAINT FK_Permissions_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_permissions_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_PermissionsUpdate
AFTER UPDATE
ON permission_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_permissions_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_PermissionsDelete
AFTER DELETE
ON permission_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_permissions_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, 1, OLD.editorId, OLD.created, NOW());
END;
CREATE TABLE IF NOT EXISTS permission_roles
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_RoleName UNIQUE (name),
CONSTRAINT FK_Roles_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_roles_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_RolesUpdate
AFTER UPDATE
ON permission_roles
FOR EACH ROW
BEGIN
INSERT INTO permission_roles_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_RolesDelete
AFTER DELETE
ON permission_roles
FOR EACH ROW
BEGIN
INSERT INTO permission_roles_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, 1, OLD.editorId, OLD.created, NOW());
END;
CREATE TABLE IF NOT EXISTS permission_role_permissions
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_RolePermission UNIQUE (RoleId, permissionId),
CONSTRAINT FK_RolePermissions_Role FOREIGN KEY (RoleId) REFERENCES permission_roles (id) ON DELETE CASCADE,
CONSTRAINT FK_RolePermissions_Permission FOREIGN KEY (permissionId) REFERENCES permission_permissions (id) ON DELETE CASCADE,
CONSTRAINT FK_RolePermissions_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_role_permissions_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_RolePermissionsUpdate
AFTER UPDATE
ON permission_role_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_role_permissions_history
(id, RoleId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.permissionId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_RolePermissionsDelete
AFTER DELETE
ON permission_role_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_role_permissions_history
(id, RoleId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.permissionId, 1, OLD.editorId, OLD.created, NOW());
END;
CREATE TABLE IF NOT EXISTS permission_role_auth_users
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
UserId INT NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_RoleUser UNIQUE (RoleId, UserId),
CONSTRAINT FK_Roleauth_users_Role FOREIGN KEY (RoleId) REFERENCES permission_roles (id) ON DELETE CASCADE,
CONSTRAINT FK_Roleauth_users_User FOREIGN KEY (UserId) REFERENCES administration_auth_users (id) ON DELETE CASCADE,
CONSTRAINT FK_Roleauth_users_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_role_auth_users_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
UserId INT NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_Roleauth_usersUpdate
AFTER UPDATE
ON permission_role_auth_users
FOR EACH ROW
BEGIN
INSERT INTO permission_role_auth_users_history
(id, RoleId, UserId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.UserId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_Roleauth_usersDelete
AFTER DELETE
ON permission_role_auth_users
FOR EACH ROW
BEGIN
INSERT INTO permission_role_auth_users_history
(id, RoleId, UserId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.UserId, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,46 @@
CREATE TABLE IF NOT EXISTS permission_api_key_permissions
(
id INT AUTO_INCREMENT PRIMARY KEY,
apiKeyId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_ApiKeyPermission UNIQUE (apiKeyId, permissionId),
CONSTRAINT FK_ApiKeyPermissions_ApiKey FOREIGN KEY (apiKeyId) REFERENCES administration_api_keys (id) ON DELETE CASCADE,
CONSTRAINT FK_ApiKeyPermissions_Permission FOREIGN KEY (permissionId) REFERENCES permission_permissions (id) ON DELETE CASCADE,
CONSTRAINT FK_ApiKeyPermissions_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_api_key_permissions_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
apiKeyId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_ApiKeyPermissionsUpdate
AFTER UPDATE
ON permission_api_key_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_api_key_permissions_history
(id, apiKeyId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.apiKeyId, OLD.permissionId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_ApiKeyPermissionsDelete
AFTER DELETE
ON permission_api_key_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_api_key_permissions_history
(id, apiKeyId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.apiKeyId, OLD.permissionId, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,26 @@
CREATE SCHEMA IF NOT EXISTS administration;
CREATE TABLE IF NOT EXISTS administration.auth_users
(
id SERIAL PRIMARY KEY,
keycloakId UUID NOT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UC_KeycloakId UNIQUE (keycloakId)
);
CREATE TABLE IF NOT EXISTS administration.auth_users_history
(
LIKE administration.auth_users
);
CREATE TRIGGER users_history_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON administration.auth_users
FOR EACH ROW
EXECUTE FUNCTION public.history_trigger_function();

View File

@@ -0,0 +1,28 @@
CREATE SCHEMA IF NOT EXISTS administration;
CREATE TABLE IF NOT EXISTS administration.api_keys
(
id SERIAL PRIMARY KEY,
identifier VARCHAR(255) NOT NULL,
keyString VARCHAR(255) NOT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UC_Identifier_Key UNIQUE (identifier, keyString),
CONSTRAINT UC_Key UNIQUE (keyString)
);
CREATE TABLE IF NOT EXISTS administration.api_keys_history
(
LIKE administration.api_keys
);
CREATE TRIGGER api_keys_history_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON administration.api_keys
FOR EACH ROW
EXECUTE FUNCTION public.history_trigger_function();

View File

@@ -0,0 +1,105 @@
CREATE SCHEMA IF NOT EXISTS permission;
-- Permissions
CREATE TABLE permission.permissions
(
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_PermissionName UNIQUE (name)
);
CREATE TABLE permission.permissions_history
(
LIKE permission.permissions
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.permissions
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();
-- Roles
CREATE TABLE permission.roles
(
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_RoleName UNIQUE (name)
);
CREATE TABLE permission.roles_history
(
LIKE permission.roles
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.roles
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();
-- Role permissions
CREATE TABLE permission.role_permissions
(
id SERIAL PRIMARY KEY,
RoleId INT NOT NULL REFERENCES permission.roles (id) ON DELETE CASCADE,
permissionId INT NOT NULL REFERENCES permission.permissions (id) ON DELETE CASCADE,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_RolePermission UNIQUE (RoleId, permissionId)
);
CREATE TABLE permission.role_permissions_history
(
LIKE permission.role_permissions
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.role_permissions
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();
-- Role user
CREATE TABLE permission.role_users
(
id SERIAL PRIMARY KEY,
RoleId INT NOT NULL REFERENCES permission.roles (id) ON DELETE CASCADE,
UserId INT NOT NULL REFERENCES administration.auth_users (id) ON DELETE CASCADE,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_RoleUser UNIQUE (RoleId, UserId)
);
CREATE TABLE permission.role_users_history
(
LIKE permission.role_users
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.role_users
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();

View File

@@ -0,0 +1,24 @@
CREATE TABLE permission.api_key_permissions
(
id SERIAL PRIMARY KEY,
apiKeyId INT NOT NULL REFERENCES administration.api_keys (id) ON DELETE CASCADE,
permissionId INT NOT NULL REFERENCES permission.permissions (id) ON DELETE CASCADE,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_ApiKeyPermission UNIQUE (apiKeyId, permissionId)
);
CREATE TABLE permission.api_key_permissions_history
(
LIKE permission.api_key_permissions
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.api_key_permissions
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();

View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-auth"
version = "2024.7.0"
description = "CPL auth"
readme ="CPL auth package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "auth", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,4 @@
cpl-core
cpl-dependency
cpl-database
python-keycloak-5.8.1

View File

@@ -1 +0,0 @@

View File

@@ -1,60 +0,0 @@
from abc import ABC, abstractmethod
from typing import Optional
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.console.console import Console
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
Contains object loaded from appsettings
services: :class:`cpl.core.dependency_injection.service_provider_abc.ServiceProviderABC`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
self._configuration: Optional[ConfigurationABC] = config
self._environment: Optional[ApplicationEnvironmentABC] = self._configuration.environment
self._services: Optional[ServiceProviderABC] = services
def run(self):
r"""Entry point
Called by custom Application.main
"""
try:
self.configure()
self.main()
except KeyboardInterrupt:
Console.close()
async def run_async(self):
r"""Entry point
Called by custom Application.main
"""
try:
await self.configure()
await self.main()
except KeyboardInterrupt:
Console.close()
@abstractmethod
def configure(self):
r"""Configure the application
Called by :class:`cpl.core.application.application_abc.ApplicationABC.run`
"""
@abstractmethod
def main(self):
r"""Custom entry point
Called by :class:`cpl.core.application.application_abc.ApplicationABC.run`
"""

View File

@@ -1,91 +0,0 @@
from typing import Type, Optional, Callable, Union
from cpl.core.application.application_abc import ApplicationABC
from cpl.core.application.application_builder_abc import ApplicationBuilderABC
from cpl.core.application.application_extension_abc import ApplicationExtensionABC
from cpl.core.application.async_application_extension_abc import AsyncApplicationExtensionABC
from cpl.core.application.async_startup_abc import AsyncStartupABC
from cpl.core.application.async_startup_extension_abc import AsyncStartupExtensionABC
from cpl.core.application.startup_abc import StartupABC
from cpl.core.application.startup_extension_abc import StartupExtensionABC
from cpl.core.configuration.configuration import Configuration
from cpl.core.dependency_injection.service_collection import ServiceCollection
class ApplicationBuilder(ApplicationBuilderABC):
r"""This is class is used to build an object of :class:`cpl.core.application.application_abc.ApplicationABC`
Parameter:
app: Type[:class:`cpl.core.application.application_abc.ApplicationABC`]
Application to build
"""
def __init__(self, app: Type[ApplicationABC]):
ApplicationBuilderABC.__init__(self)
self._app = app
self._startup: Optional[StartupABC | AsyncStartupABC] = None
self._configuration = Configuration()
self._environment = self._configuration.environment
self._services = ServiceCollection(self._configuration)
self._app_extensions: list[Type[ApplicationExtensionABC | AsyncApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC | AsyncStartupABC]] = []
def use_startup(self, startup: Type[StartupABC | AsyncStartupABC]) -> "ApplicationBuilder":
self._startup = startup()
return self
def use_extension(
self, extension: Type[ApplicationExtensionABC | AsyncApplicationExtensionABC | StartupExtensionABC | AsyncStartupExtensionABC]
) -> "ApplicationBuilder":
if (issubclass(extension, ApplicationExtensionABC) or issubclass(extension, AsyncApplicationExtensionABC)) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (issubclass(extension, StartupExtensionABC) or issubclass(extension, AsyncStartupExtensionABC)) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def _build_startup(self):
for ex in self._startup_extensions:
extension = ex()
extension.configure_configuration(self._configuration, self._environment)
extension.configure_services(self._services, self._environment)
if self._startup is not None:
self._startup.configure_configuration(self._configuration, self._environment)
self._startup.configure_services(self._services, self._environment)
async def _build_async_startup(self):
for ex in self._startup_extensions:
extension = ex()
await extension.configure_configuration(self._configuration, self._environment)
await extension.configure_services(self._services, self._environment)
if self._startup is not None:
await self._startup.configure_configuration(self._configuration, self._environment)
await self._startup.configure_services(self._services, self._environment)
def build(self) -> ApplicationABC:
self._build_startup()
config = self._configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
extension.run(config, services)
return self._app(config, services)
async def build_async(self) -> ApplicationABC:
await self._build_async_startup()
config = self._configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
await extension.run(config, services)
return self._app(config, services)

View File

@@ -1,47 +0,0 @@
from abc import ABC, abstractmethod
from typing import Type
from cpl.core.application.application_abc import ApplicationABC
from cpl.core.application.startup_abc import StartupABC
class ApplicationBuilderABC(ABC):
r"""ABC for the :class:`cpl.core.application.application_builder.ApplicationBuilder`"""
@abstractmethod
def __init__(self, *args):
pass
@abstractmethod
def use_startup(self, startup: Type[StartupABC]):
r"""Sets the custom startup class to use
Parameter:
startup: Type[:class:`cpl.core.application.startup_abc.StartupABC`]
Startup class to use
"""
@abstractmethod
async def use_startup(self, startup: Type[StartupABC]):
r"""Sets the custom startup class to use async
Parameter:
startup: Type[:class:`cpl.core.application.startup_abc.StartupABC`]
Startup class to use
"""
@abstractmethod
def build(self) -> ApplicationABC:
r"""Creates custom application object
Returns:
Object of :class:`cpl.core.application.application_abc.ApplicationABC`
"""
@abstractmethod
async def build_async(self) -> ApplicationABC:
r"""Creates custom application object async
Returns:
Object of :class:`cpl.core.application.application_abc.ApplicationABC`
"""

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import ConfigurationABC
from cpl.core.dependency_injection import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, config: ConfigurationABC, services: ServiceProviderABC):
pass

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import ConfigurationABC
from cpl.core.dependency_injection import ServiceProviderABC
class AsyncApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def run(self, config: ConfigurationABC, services: ServiceProviderABC):
pass

View File

@@ -1,32 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class AsyncStartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
async def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,31 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class AsyncStartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
async def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,32 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class StartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,31 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,12 +1,2 @@
from .argument_abc import ArgumentABC
from .argument_builder import ArgumentBuilder
from .argument_executable_abc import ArgumentExecutableABC
from .argument_type_enum import ArgumentTypeEnum
from .configuration import Configuration from .configuration import Configuration
from .configuration_abc import ConfigurationABC
from .configuration_model_abc import ConfigurationModelABC from .configuration_model_abc import ConfigurationModelABC
from .configuration_variable_name_enum import ConfigurationVariableNameEnum
from .executable_argument import ExecutableArgument
from .flag_argument import FlagArgument
from .validator_abc import ValidatorABC
from .variable_argument import VariableArgument

View File

@@ -1,64 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
class ArgumentABC(ABC):
@abstractmethod
def __init__(
self,
token: str,
name: str,
aliases: list[str],
prevent_next_executable: bool = False,
console_arguments: list["ArgumentABC"] = None,
):
r"""Representation of an console argument
Parameter:
token: :class:`str`
name: :class:`str`
aliases: list[:class:`str`]
console_arguments: List[:class:`cpl.core.configuration.console_argument.ConsoleArgument`]
"""
self._token = token
self._name = name
self._aliases = aliases
self._prevent_next_executable = prevent_next_executable
self._console_arguments = console_arguments if console_arguments is not None else []
@property
def token(self) -> str:
return self._token
@property
def name(self) -> str:
return self._name
@property
def aliases(self) -> list[str]:
return self._aliases
@property
def prevent_next_executable(self) -> bool:
return self._prevent_next_executable
@property
def console_arguments(self) -> list["ArgumentABC"]:
return self._console_arguments
def add_console_argument(self, arg_type: ArgumentTypeEnum, *args, **kwargs) -> "ArgumentABC":
r"""Creates and adds a console argument to known console arguments
Parameter:
arg_type: :class:`str`
Specifies the specific type of the argument
Returns:
self :class:`cpl.core.configuration.console_argument.ConsoleArgument` not created argument!
"""
from cpl.core.configuration.argument_builder import ArgumentBuilder
argument = ArgumentBuilder.build_argument(arg_type, *args, *kwargs)
self._console_arguments.append(argument)
return self

View File

@@ -1,30 +0,0 @@
from typing import Union
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
from cpl.core.configuration.executable_argument import ExecutableArgument
from cpl.core.configuration.flag_argument import FlagArgument
from cpl.core.configuration.variable_argument import VariableArgument
from cpl.core.console import Console
class ArgumentBuilder:
@staticmethod
def build_argument(
arg_type: ArgumentTypeEnum, *args, **kwargs
) -> Union[ExecutableArgument, FlagArgument, VariableArgument]:
argument = None
try:
match arg_type:
case ArgumentTypeEnum.Flag:
argument = FlagArgument(*args, **kwargs)
case ArgumentTypeEnum.Executable:
argument = ExecutableArgument(*args, **kwargs)
case ArgumentTypeEnum.Variable:
argument = VariableArgument(*args, **kwargs)
case _:
Console.error("Invalid argument type")
Console.close()
except TypeError as e:
Console.error(str(e))
Console.close()
return argument

View File

@@ -1,11 +0,0 @@
from abc import ABC, abstractmethod
class ArgumentExecutableABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, args: list[str]):
pass

View File

@@ -1,7 +0,0 @@
from enum import Enum
class ArgumentTypeEnum(Enum):
Flag = 0
Executable = 1
Variable = 3

View File

@@ -1,65 +1,17 @@
import inspect
import json import json
import os import os
import sys import sys
import traceback from typing import Any
from collections.abc import Callable
from typing import Union, Optional
from cpl.core.configuration.argument_abc import ArgumentABC
from cpl.core.configuration.argument_builder import ArgumentBuilder
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.configuration.configuration_variable_name_enum import (
ConfigurationVariableNameEnum,
)
from cpl.core.configuration.executable_argument import ExecutableArgument
from cpl.core.configuration.flag_argument import FlagArgument
from cpl.core.configuration.variable_argument import VariableArgument
from cpl.core.console.console import Console from cpl.core.console.console import Console
from cpl.core.console.foreground_color_enum import ForegroundColorEnum from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC from cpl.core.typing import D, T
from cpl.core.environment.application_environment import ApplicationEnvironment
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.environment.environment_name_enum import EnvironmentNameEnum
from cpl.core.typing import T, R
from cpl.core.utils.json_processor import JSONProcessor
class Configuration(ConfigurationABC): class Configuration:
def __init__(self): _config = {}
r"""Representation of configuration"""
ConfigurationABC.__init__(self)
self._application_environment = ApplicationEnvironment()
self._config: dict[Union[type, str], Union[ConfigurationModelABC, str]] = {}
self._argument_types: list[ArgumentABC] = []
self._additional_arguments: list[str] = []
self._argument_error_function: Optional[Callable] = None
self._handled_args = []
@property
def environment(self) -> ApplicationEnvironmentABC:
return self._application_environment
@property
def additional_arguments(self) -> list[str]:
return self._additional_arguments
@property
def argument_error_function(self) -> Optional[Callable]:
return self._argument_error_function
@argument_error_function.setter
def argument_error_function(self, argument_error_function: Callable):
self._argument_error_function = argument_error_function
@property
def arguments(self) -> list[ArgumentABC]:
return self._argument_types
@staticmethod @staticmethod
def _print_info(message: str): def _print_info(message: str):
@@ -72,7 +24,7 @@ class Configuration(ConfigurationABC):
Info message Info message
""" """
Console.set_foreground_color(ForegroundColorEnum.green) Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f"[CONF] {message}") Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default) Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod @staticmethod
@@ -86,7 +38,7 @@ class Configuration(ConfigurationABC):
Warning message Warning message
""" """
Console.set_foreground_color(ForegroundColorEnum.yellow) Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.write_line(f"[CONF] {message}") Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default) Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod @staticmethod
@@ -100,31 +52,11 @@ class Configuration(ConfigurationABC):
Error message Error message
""" """
Console.set_foreground_color(ForegroundColorEnum.red) Console.set_foreground_color(ForegroundColorEnum.red)
Console.write_line(f"[CONF] {message}") Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default) Console.set_foreground_color(ForegroundColorEnum.default)
def _set_variable(self, name: str, value: any): @classmethod
r"""Sets variable to given value def _load_json_file(cls, file: str, output: bool) -> dict:
Parameter:
name: :class:`str`
Name of the variable
value: :class:`any`
Value of the variable
"""
if name == ConfigurationVariableNameEnum.environment.value:
self._application_environment.environment_name = EnvironmentNameEnum(value)
elif name == ConfigurationVariableNameEnum.name.value:
self._application_environment.application_name = value
elif name == ConfigurationVariableNameEnum.customer.value:
self._application_environment.customer = value
else:
self._config[name] = value
def _load_json_file(self, file: str, output: bool) -> dict:
r"""Reads the json file r"""Reads the json file
Parameter: Parameter:
@@ -142,90 +74,21 @@ class Configuration(ConfigurationABC):
# load json # load json
json_cfg = json.load(cfg) json_cfg = json.load(cfg)
if output: if output:
self._print_info(f"Loaded config file: {file}") cls._print_info(f"Loaded config file: {file}")
return json_cfg return json_cfg
except Exception as e: except Exception as e:
self._print_error(f"Cannot load config file: {file}! -> {e}") cls._print_error(f"Cannot load config file: {file}! -> {e}")
return {} return {}
def _parse_arguments( @classmethod
self, def add_json_file(cls, name: str, optional: bool = None, output: bool = True, path: str = None):
executables: list[ArgumentABC],
arg_list: list[str],
args_types: list[ArgumentABC],
):
for i in range(0, len(arg_list)):
arg_str = arg_list[i]
for n in range(0, len(args_types)):
arg = args_types[n]
arg_str_without_token = arg_str
if arg.token != "" and arg.token in arg_str:
arg_str_without_token = arg_str.split(arg.token)[1]
# executable
if isinstance(arg, ExecutableArgument):
if (
arg_str.startswith(arg.token)
and arg_str_without_token == arg.name
or arg_str_without_token in arg.aliases
):
executables.append(arg)
self._handled_args.append(arg_str)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# variables
elif isinstance(arg, VariableArgument):
arg_str_without_value = arg_str_without_token
if arg.value_token in arg_str_without_value:
arg_str_without_value = arg_str_without_token.split(arg.value_token)[0]
if (
arg_str.startswith(arg.token)
and arg_str_without_value == arg.name
or arg_str_without_value in arg.aliases
):
if arg.value_token != " ":
value = arg_str_without_token.split(arg.value_token)[1]
else:
value = arg_list[i + 1]
self._set_variable(arg.name, value)
self._handled_args.append(arg_str)
self._handled_args.append(value)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# flags
elif isinstance(arg, FlagArgument):
if (
arg_str.startswith(arg.token)
and arg_str_without_token == arg.name
or arg_str_without_token in arg.aliases
):
if arg_str in self._additional_arguments:
self._additional_arguments.remove(arg_str)
self._additional_arguments.append(arg.name)
self._handled_args.append(arg_str)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# add left over values to args
if arg_str not in self._additional_arguments and arg_str not in self._handled_args:
self._additional_arguments.append(arg_str)
def add_environment_variables(self, prefix: str):
for env_var in os.environ.keys():
if not env_var.startswith(prefix):
continue
self._set_variable(env_var.replace(prefix, ""), os.environ[env_var])
def add_console_argument(self, argument: ArgumentABC):
self._argument_types.append(argument)
def add_json_file(self, name: str, optional: bool = None, output: bool = True, path: str = None):
if os.path.isabs(name): if os.path.isabs(name):
file_path = name file_path = name
else: else:
path_root = self._application_environment.working_directory from cpl.core.environment import Environment
path_root = Environment.get_cwd()
if path is not None: if path is not None:
path_root = path path_root = path
@@ -237,122 +100,33 @@ class Configuration(ConfigurationABC):
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
if optional is not True: if optional is not True:
if output: if output:
self._print_error(f"File not found: {file_path}") cls._print_error(f"File not found: {file_path}")
sys.exit() sys.exit()
if output: if output:
self._print_warn(__name__, f"Not Loaded config file: {file_path}") cls._print_warn(f"Not Loaded config file: {file_path}")
return None return None
config_from_file = self._load_json_file(file_path, output) config_from_file = cls._load_json_file(file_path, output)
for sub in ConfigurationModelABC.__subclasses__(): for sub in ConfigurationModelABC.__subclasses__():
for key, value in config_from_file.items(): for key, value in config_from_file.items():
if sub.__name__ != key and sub.__name__.replace("Settings", "") != key: if sub.__name__ != key and sub.__name__.replace("Settings", "") != key:
continue continue
configuration = sub() cls.set(sub, sub(value))
from_dict = getattr(configuration, "from_dict", None)
if from_dict is not None and not hasattr(from_dict, "is_base_func"): @classmethod
Console.set_foreground_color(ForegroundColorEnum.yellow) def set(cls, key: Any, value: T):
Console.write_line( if inspect.isclass(key):
f"{sub.__name__}.from_dict is deprecated. Instead, set attributes as typed arguments in __init__. They can be None by default!" key = key.__name__
)
Console.color_reset()
configuration.from_dict(value)
else:
configuration = JSONProcessor.process(sub, value)
self.add_configuration(sub, configuration) cls._config[key] = value
def add_configuration(self, key_type: T, value: any): @classmethod
self._config[key_type] = value def get(cls, key: Any, default: D = None) -> T | D:
if inspect.isclass(key):
key = key.__name__
def create_console_argument( return cls._config.get(key, default)
self,
arg_type: ArgumentTypeEnum,
token: str,
name: str,
aliases: list[str],
*args,
**kwargs,
) -> ArgumentABC:
argument = ArgumentBuilder.build_argument(arg_type, token, name, aliases, *args, **kwargs)
self._argument_types.append(argument)
return argument
def for_each_argument(self, call: Callable):
for arg in self._argument_types:
call(arg)
def get_configuration(self, search_type: T) -> Optional[R]:
if type(search_type) is str:
if search_type == ConfigurationVariableNameEnum.environment.value:
return self._application_environment.environment_name
elif search_type == ConfigurationVariableNameEnum.name.value:
return self._application_environment.application_name
elif search_type == ConfigurationVariableNameEnum.customer.value:
return self._application_environment.customer
if search_type not in self._config:
return None
for config_model in self._config:
if config_model == search_type:
return self._config[config_model]
def parse_console_arguments(self, services: ServiceProviderABC, error: bool = None) -> bool:
# sets environment variables as possible arguments as: --VAR=VALUE
for arg_name in ConfigurationVariableNameEnum.to_list():
self.add_console_argument(VariableArgument("--", str(arg_name).upper(), [str(arg_name).lower()], "="))
success = False
try:
arg_list = sys.argv[1:]
executables: list[ExecutableArgument] = []
self._parse_arguments(executables, arg_list, self._argument_types)
except Exception:
Console.error("An error occurred while parsing arguments.", traceback.format_exc())
sys.exit()
try:
prevent = False
for exe in executables:
if prevent:
continue
if exe.validators is not None:
abort = False
for validator_type in exe.validators:
validator = services.get_service(validator_type)
result = validator.validate()
abort = not result
if abort:
break
if abort:
sys.exit()
cmd = services.get_service(exe.executable_type)
self._handle_pre_or_post_executables(True, exe, services)
self._set_variable("ACTIVE_EXECUTABLE", exe.name)
args = self.get_configuration("ARGS")
if args is not None:
for arg in args.split(" "):
if arg == "":
continue
self._additional_arguments.append(arg)
cmd.run(self._additional_arguments)
self._handle_pre_or_post_executables(False, exe, services)
prevent = exe.prevent_next_executable
success = True
except Exception:
Console.error("An error occurred while executing arguments.", traceback.format_exc())
sys.exit()
return success

View File

@@ -1,140 +0,0 @@
from abc import abstractmethod, ABC
from collections.abc import Callable
from typing import Optional
from cpl.core.configuration.argument_abc import ArgumentABC
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.typing import T, R
class ConfigurationABC(ABC):
@abstractmethod
def __init__(self):
r"""ABC for the :class:`cpl.core.configuration.configuration.Configuration`"""
@property
@abstractmethod
def environment(self) -> ApplicationEnvironmentABC:
pass
@property
@abstractmethod
def additional_arguments(self) -> list[str]:
pass
@property
@abstractmethod
def argument_error_function(self) -> Optional[Callable]:
pass
@argument_error_function.setter
@abstractmethod
def argument_error_function(self, argument_error_function: Callable):
pass
@property
@abstractmethod
def arguments(self) -> list[ArgumentABC]:
pass
@abstractmethod
def add_environment_variables(self, prefix: str):
r"""Reads the environment variables
Parameter:
prefix: :class:`str`
Prefix of the variables
"""
@abstractmethod
def add_console_argument(self, argument: ArgumentABC):
r"""Adds console argument to known console arguments
Parameter:
argument: :class:`cpl.core.configuration.console_argument.ConsoleArgumentABC`
Specifies the console argument
"""
@abstractmethod
def add_json_file(self, name: str, optional: bool = None, output: bool = True, path: str = None):
r"""Reads and saves settings from given json file
Parameter:
name: :class:`str`
Name of the file
optional: :class:`str`
Specifies whether an error should occur if the file was not found
output: :class:`bool`
Specifies whether an output should take place
path: :class:`str`
Path in which the file should be stored
"""
@abstractmethod
def add_configuration(self, key_type: T, value: any):
r"""Add configuration object
Parameter:
key_type: :class:`cpl.core.type.T`
Type of the value
value: any
Object of the value
"""
@abstractmethod
def create_console_argument(
self, arg_type: ArgumentTypeEnum, token: str, name: str, aliases: list[str], *args, **kwargs
) -> ArgumentABC:
r"""Creates and adds a console argument to known console arguments
Parameter:
token: :class:`str`
Specifies optional beginning of argument
name :class:`str`
Specifies name of argument
aliases list[:class:`str`]
Specifies possible aliases of name
value_token :class:`str`
Specifies were the value begins
is_value_token_optional :class:`bool`
Specifies if values are optional
runnable: :class:`cpl.core.configuration.console_argument.ConsoleArgumentABC`
Specifies class to run when called if value is not None
Returns:
Object of :class:`cpl.core.configuration.console_argument.ConsoleArgumentABC`
"""
@abstractmethod
def for_each_argument(self, call: Callable):
r"""Iterates through all arguments and calls the call function
Parameter:
call: :class:`Callable`
Call for each argument
"""
@abstractmethod
def get_configuration(self, search_type: T) -> Optional[R]:
r"""Returns value from configuration by given type
Parameter:
search_type: :class:`cpl.core.type.T`
Type to search for
Returns:
Object of Union[:class:`str`, :class:`cpl.core.configuration.configuration_model_abc.ConfigurationModelABC`]
"""
@abstractmethod
def parse_console_arguments(self, services: "ServiceProviderABC", error: bool = None) -> bool:
r"""Reads the console arguments
Parameter:
error: :class:`bool`
Defines is invalid argument error will be shown or not
Returns:
Bool to specify if executables were executed or not.
"""

View File

@@ -1,21 +1,81 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional, Type, Any
from cpl.core.typing import T
def base_func(method): from cpl.core.utils.cast import cast
method.is_base_func = True from cpl.core.utils.get_value import get_value
return method from cpl.core.utils.string import String
class ConfigurationModelABC(ABC): class ConfigurationModelABC(ABC):
r"""
ABC for configuration model classes
"""
@abstractmethod @abstractmethod
def __init__(self): def __init__(
r"""ABC for settings representation""" self,
src: Optional[dict] = None,
env_prefix: Optional[str] = None,
readonly: bool = True,
):
ABC.__init__(self)
@base_func self._src = src or {}
def from_dict(self, settings: dict): self._options: dict[str, Any] = {}
r"""DEPRECATED: Set attributes as typed arguments in __init__ instead. See https://docs.sh-edraft.de/cpl/deprecated.html#ConfigurationModelABC-from_dict-method for further information
Converts attributes to dict
Parameter: self._env_prefix = env_prefix
settings: :class:`dict` self._readonly = readonly
"""
def __setattr__(self, attr: str, value: Any):
if hasattr(self, "_readonly") and self._readonly:
raise AttributeError(f"Cannot set attribute: {attr}. {type(self).__name__} is read-only")
super().__setattr__(attr, value)
def __getattr__(self, attr: str) -> Any:
options = super().__getattribute__("_options")
if attr in options:
return options[attr]
return super().__getattribute__(attr)
def option(self, field: str, cast_type: Type[T], default=None, required=False, from_env=True):
value = None
field_variants = [
field,
String.first_to_upper(field),
String.first_to_lower(field),
String.to_camel_case(field),
String.to_snake_case(field),
String.first_to_upper(String.to_camel_case(field)),
]
value = None
for variant in field_variants:
if variant in self._src:
value = self._src[variant]
break
if value is None and from_env:
from cpl.core.environment import Environment
env_field = field.upper()
if self._env_prefix:
env_field = f"{self._env_prefix}_{env_field}"
value = Environment.get(env_field, cast_type)
if value is None and required:
raise ValueError(f"{field} is required")
elif value is None:
self._options[field] = default
return
self._options[field] = cast(value, cast_type)
def get(self, field: str, default=None) -> Optional[T]:
return get_value(self._src, field, self._options[field].type, default)
def to_dict(self) -> dict:
return {field: self.get(field) for field in self._options.keys()}

View File

@@ -1,11 +0,0 @@
from enum import Enum
class ConfigurationVariableNameEnum(Enum):
environment = "ENVIRONMENT"
name = "NAME"
customer = "CUSTOMER"
@staticmethod
def to_list():
return [var.value for var in ConfigurationVariableNameEnum]

View File

@@ -1,40 +0,0 @@
from typing import Type, Optional
from cpl.core.configuration.argument_executable_abc import ArgumentExecutableABC
from cpl.core.configuration.argument_abc import ArgumentABC
from cpl.core.configuration.validator_abc import ValidatorABC
class ExecutableArgument(ArgumentABC):
def __init__(
self,
token: str,
name: str,
aliases: list[str],
executable: Type[ArgumentExecutableABC],
prevent_next_executable: bool = False,
validators: list[Type[ValidatorABC]] = None,
console_arguments: list["ArgumentABC"] = None,
):
self._executable_type = executable
self._validators = validators
self._executable: Optional[ArgumentExecutableABC] = None
ArgumentABC.__init__(self, token, name, aliases, prevent_next_executable, console_arguments)
@property
def executable_type(self) -> type:
return self._executable_type
def set_executable(self, executable: ArgumentExecutableABC):
self._executable = executable
@property
def validators(self) -> list[Type[ValidatorABC]]:
return self._validators
def run(self, args: list[str]):
r"""Executes runnable if exists"""
if self._executable is None:
return
self._executable.execute(args)

View File

@@ -1,13 +0,0 @@
from cpl.core.configuration.argument_abc import ArgumentABC
class FlagArgument(ArgumentABC):
def __init__(
self,
token: str,
name: str,
aliases: list[str],
prevent_next_executable: bool = False,
console_arguments: list["ArgumentABC"] = None,
):
ArgumentABC.__init__(self, token, name, aliases, prevent_next_executable, console_arguments)

View File

@@ -1,11 +0,0 @@
from abc import ABC, abstractmethod
class ValidatorABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def validate(self) -> bool:
pass

View File

@@ -1,28 +0,0 @@
from cpl.core.configuration.argument_abc import ArgumentABC
class VariableArgument(ArgumentABC):
def __init__(
self,
token: str,
name: str,
aliases: list[str],
value_token: str,
prevent_next_executable: bool = False,
console_arguments: list["ArgumentABC"] = None,
):
self._value_token = value_token
self._value: str = ""
ArgumentABC.__init__(self, token, name, aliases, prevent_next_executable, console_arguments)
@property
def value_token(self) -> str:
return self._value_token
@property
def value(self) -> str:
return self._value
def set_value(self, value: str):
self._value = value

View File

@@ -1,5 +1,4 @@
from .background_color_enum import BackgroundColorEnum from .background_color_enum import BackgroundColorEnum
from .console import Console from .console import Console
from .console_call import ConsoleCall from ._call import ConsoleCall
from .foreground_color_enum import ForegroundColorEnum from .foreground_color_enum import ForegroundColorEnum
from .spinner_thread import SpinnerThread

View File

@@ -1,7 +1,8 @@
import os import os
import sys import sys
import threading import multiprocessing
import time import time
from multiprocessing import Process
from termcolor import colored from termcolor import colored
@@ -9,8 +10,8 @@ from cpl.core.console.background_color_enum import BackgroundColorEnum
from cpl.core.console.foreground_color_enum import ForegroundColorEnum from cpl.core.console.foreground_color_enum import ForegroundColorEnum
class SpinnerThread(threading.Thread): class Spinner(Process):
r"""Thread to show spinner in terminal r"""Process to show spinner in terminal
Parameter: Parameter:
msg_len: :class:`int` msg_len: :class:`int`
@@ -22,7 +23,7 @@ class SpinnerThread(threading.Thread):
""" """
def __init__(self, msg_len: int, foreground_color: ForegroundColorEnum, background_color: BackgroundColorEnum): def __init__(self, msg_len: int, foreground_color: ForegroundColorEnum, background_color: BackgroundColorEnum):
threading.Thread.__init__(self) Process.__init__(self)
self._msg_len = msg_len self._msg_len = msg_len
self._foreground_color = foreground_color self._foreground_color = foreground_color
@@ -50,29 +51,26 @@ class SpinnerThread(threading.Thread):
return color_args return color_args
def run(self) -> None: def run(self) -> None:
r"""Entry point of thread, shows the spinner""" r"""Entry point of process, shows the spinner"""
columns = 0 columns = 0
if sys.platform == "win32": if sys.platform == "win32":
columns = os.get_terminal_size().columns columns = os.get_terminal_size().columns
else: else:
term_rows, term_columns = os.popen("stty size", "r").read().split() values = os.popen("stty size", "r").read().split()
term_rows, term_columns = values if len(values) == 2 else (0, 0)
columns = int(term_columns) columns = int(term_columns)
end_msg = "done" end_msg = "done"
end_msg_pos = columns - self._msg_len - len(end_msg)
if end_msg_pos > 0: padding = columns - self._msg_len - len(end_msg)
print(f'{"" : >{end_msg_pos}}', end="") if padding > 0:
print(f'{"" : >{padding}}', end="")
else: else:
print("", end="") print("", end="")
first = True
spinner = self._spinner() spinner = self._spinner()
while self._is_spinning: while self._is_spinning:
if first: print(colored(f"{next(spinner): >{len(end_msg)}}", *self._get_color_args()), end="")
first = False
print(colored(f"{next(spinner): >{len(end_msg) - 1}}", *self._get_color_args()), end="")
else:
print(colored(f"{next(spinner): >{len(end_msg)}}", *self._get_color_args()), end="")
time.sleep(0.1) time.sleep(0.1)
back = "" back = ""
for i in range(0, len(end_msg)): for i in range(0, len(end_msg)):
@@ -84,9 +82,10 @@ class SpinnerThread(threading.Thread):
if not self._exit: if not self._exit:
print(colored(end_msg, *self._get_color_args()), end="") print(colored(end_msg, *self._get_color_args()), end="")
def stop_spinning(self): def stop(self):
r"""Stops the spinner""" r"""Stops the spinner"""
self._is_spinning = False self._is_spinning = False
super().terminate()
time.sleep(0.1) time.sleep(0.1)
def exit(self): def exit(self):

View File

@@ -10,9 +10,9 @@ from tabulate import tabulate
from termcolor import colored from termcolor import colored
from cpl.core.console.background_color_enum import BackgroundColorEnum from cpl.core.console.background_color_enum import BackgroundColorEnum
from cpl.core.console.console_call import ConsoleCall from cpl.core.console._call import ConsoleCall
from cpl.core.console.foreground_color_enum import ForegroundColorEnum from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.console.spinner_thread import SpinnerThread from cpl.core.console._spinner import Spinner
class Console: class Console:
@@ -464,7 +464,7 @@ class Console:
cls.set_hold_back(True) cls.set_hold_back(True)
spinner = None spinner = None
if not cls._disabled: if not cls._disabled:
spinner = SpinnerThread(len(message), spinner_foreground_color, spinner_background_color) spinner = Spinner(len(message), spinner_foreground_color, spinner_background_color)
spinner.start() spinner.start()
return_value = None return_value = None
@@ -476,7 +476,7 @@ class Console:
cls.close() cls.close()
if spinner is not None: if spinner is not None:
spinner.stop_spinning() spinner.stop()
cls.set_hold_back(False) cls.set_hold_back(False)
cls.set_foreground_color(ForegroundColorEnum.default) cls.set_foreground_color(ForegroundColorEnum.default)

View File

@@ -0,0 +1 @@
from .user_context import set_user, get_user

View File

@@ -0,0 +1,18 @@
from contextvars import ContextVar
from typing import Optional
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.schema._administration.auth_user import AuthUser
_user_context: ContextVar[Optional[AuthUser]] = ContextVar("user", default=None)
_logger = AuthLogger(__name__)
def set_user(user_id: Optional[AuthUser]):
_logger.trace("Setting user context", user_id)
_user_context.set(user_id)
def get_user() -> Optional[AuthUser]:
return _user_context.get()

View File

@@ -1,3 +0,0 @@
from .database_settings_name_enum import DatabaseSettingsNameEnum
from .database_settings import DatabaseSettings
from .table_abc import TableABC

View File

@@ -1,2 +0,0 @@
from .database_connection import DatabaseConnection
from .database_connection_abc import DatabaseConnectionABC

View File

@@ -1,2 +0,0 @@
from .database_context import DatabaseContext
from .database_context_abc import DatabaseContextABC

View File

@@ -1,52 +0,0 @@
from typing import Optional
from cpl.core.database.connection.database_connection import DatabaseConnection
from cpl.core.database.connection.database_connection_abc import DatabaseConnectionABC
from cpl.core.database.context.database_context_abc import DatabaseContextABC
from cpl.core.database.database_settings import DatabaseSettings
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseContext(DatabaseContextABC):
r"""Representation of the database context
Parameter:
database_settings: :class:`cpl.core.database.database_settings.DatabaseSettings`
"""
def __init__(self):
DatabaseContextABC.__init__(self)
self._db: DatabaseConnectionABC = DatabaseConnection()
self._settings: Optional[DatabaseSettings] = None
@property
def cursor(self) -> MySQLCursorBuffered:
self._ping_and_reconnect()
return self._db.cursor
def _ping_and_reconnect(self):
try:
self._db.server.ping(reconnect=True, attempts=3, delay=5)
except Exception:
# reconnect your cursor as you did in __init__ or wherever
if self._settings is None:
raise Exception("Call DatabaseContext.connect first")
self.connect(self._settings)
def connect(self, database_settings: DatabaseSettings):
if self._settings is None:
self._settings = database_settings
self._db.connect(database_settings)
self.save_changes()
def save_changes(self):
self._ping_and_reconnect()
self._db.server.commit()
def select(self, statement: str) -> list[tuple]:
self._ping_and_reconnect()
self._db.cursor.execute(statement)
return self._db.cursor.fetchall()

View File

@@ -1,40 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.database.database_settings import DatabaseSettings
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseContextABC(ABC):
r"""ABC for the :class:`cpl.core.database.context.database_context.DatabaseContext`"""
@abstractmethod
def __init__(self, *args):
pass
@property
@abstractmethod
def cursor(self) -> MySQLCursorBuffered:
pass
@abstractmethod
def connect(self, database_settings: DatabaseSettings):
r"""Connects to a database by connection settings
Parameter:
database_settings :class:`cpl.core.database.database_settings.DatabaseSettings`
"""
@abstractmethod
def save_changes(self):
r"""Saves changes of the database"""
@abstractmethod
def select(self, statement: str) -> list[tuple]:
r"""Runs SQL Statements
Parameter:
statement: :class:`str`
Returns:
list: Fetched list of selected elements
"""

View File

@@ -1,73 +0,0 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
class DatabaseSettings(ConfigurationModelABC):
r"""Represents settings for the database connection"""
def __init__(
self,
host: str = None,
port: int = 3306,
user: str = None,
password: str = None,
database: str = None,
charset: str = "utf8mb4",
use_unicode: bool = False,
buffered: bool = False,
auth_plugin: str = "caching_sha2_password",
ssl_disabled: bool = False,
):
ConfigurationModelABC.__init__(self)
self._host: Optional[str] = host
self._port: Optional[int] = port
self._user: Optional[str] = user
self._password: Optional[str] = password
self._database: Optional[str] = database
self._charset: Optional[str] = charset
self._use_unicode: Optional[bool] = use_unicode
self._buffered: Optional[bool] = buffered
self._auth_plugin: Optional[str] = auth_plugin
self._ssl_disabled: Optional[bool] = ssl_disabled
@property
def host(self) -> Optional[str]:
return self._host
@property
def port(self) -> Optional[int]:
return self._port
@property
def user(self) -> Optional[str]:
return self._user
@property
def password(self) -> Optional[str]:
return self._password
@property
def database(self) -> Optional[str]:
return self._database
@property
def charset(self) -> Optional[str]:
return self._charset
@property
def use_unicode(self) -> Optional[bool]:
return self._use_unicode
@property
def buffered(self) -> Optional[bool]:
return self._buffered
@property
def auth_plugin(self) -> Optional[str]:
return self._auth_plugin
@property
def ssl_disabled(self) -> Optional[bool]:
return self._ssl_disabled

View File

@@ -1,13 +0,0 @@
from enum import Enum
class DatabaseSettingsNameEnum(Enum):
host = "Host"
port = "Port"
user = "User"
password = "Password"
database = "Database"
charset = "Charset"
use_unicode = "UseUnicode"
buffered = "Buffered"
auth_plugin = "AuthPlugin"

View File

@@ -1,37 +0,0 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
class TableABC(ABC):
@abstractmethod
def __init__(self):
self._created_at: Optional[datetime] = datetime.now().isoformat()
self._modified_at: Optional[datetime] = datetime.now().isoformat()
@property
def created_at(self) -> datetime:
return self._created_at
@property
def modified_at(self) -> datetime:
return self._modified_at
@modified_at.setter
def modified_at(self, value: datetime):
self._modified_at = value
@property
@abstractmethod
def insert_string(self) -> str:
pass
@property
@abstractmethod
def udpate_string(self) -> str:
pass
@property
@abstractmethod
def delete_string(self) -> str:
pass

View File

@@ -1,18 +0,0 @@
from cpl.core.dependency_injection.scope import Scope
from cpl.core.dependency_injection.scope_abc import ScopeABC
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
class ScopeBuilder:
r"""Class to build :class:`cpl.core.dependency_injection.scope.Scope`"""
def __init__(self, service_provider: ServiceProviderABC) -> None:
self._service_provider = service_provider
def build(self) -> ScopeABC:
r"""Returns scope
Returns:
Object of type :class:`cpl.core.dependency_injection.scope.Scope`
"""
return Scope(self._service_provider)

View File

@@ -1,90 +0,0 @@
from abc import abstractmethod, ABC
from typing import Type
from cpl.core.database.context.database_context_abc import DatabaseContextABC
from cpl.core.database.database_settings import DatabaseSettings
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.typing import T, Source
class ServiceCollectionABC(ABC):
r"""ABC for the class :class:`cpl.core.dependency_injection.service_collection.ServiceCollection`"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def add_db_context(self, db_context_type: Type[DatabaseContextABC], db_settings: DatabaseSettings):
r"""Adds database context
Parameter:
db_context: Type[:class:`cpl.core.database.context.database_context_abc.DatabaseContextABC`]
Database context
"""
@abstractmethod
def add_logging(self):
r"""Adds the CPL internal logger"""
@abstractmethod
def add_pipes(self):
r"""Adds the CPL internal pipes as transient"""
def add_translation(self):
r"""Adds the CPL translation"""
raise NotImplementedError("You should install and use the cpl-translation package")
def add_mail(self):
r"""Adds the CPL mail"""
raise NotImplementedError("You should install and use the cpl-mail package")
@abstractmethod
def add_transient(self, service_type: T, service: T = None) -> "ServiceCollectionABC":
r"""Adds a service with transient lifetime
Parameter:
service_type: :class:`Type`
Type of the service
service: :class:`Callable`
Object of the service
Returns:
self: :class:`cpl.core.dependency_injection.service_collection_abc.ServiceCollectionABC`
"""
@abstractmethod
def add_scoped(self, service_type: T, service: T = None) -> "ServiceCollectionABC":
r"""Adds a service with scoped lifetime
Parameter:
service_type: :class:`Type`
Type of the service
service: :class:`Callable`
Object of the service
Returns:
self: :class:`cpl.core.dependency_injection.service_collection_abc.ServiceCollectionABC`
"""
@abstractmethod
def add_singleton(self, service_type: T, service: T = None) -> "ServiceCollectionABC":
r"""Adds a service with singleton lifetime
Parameter:
service_type: :class:`Type`
Type of the service
service: :class:`Callable`
Object of the service
Returns:
self: :class:`cpl.core.dependency_injection.service_collection_abc.ServiceCollectionABC`
"""
@abstractmethod
def build_service_provider(self) -> ServiceProviderABC:
r"""Creates instance of the service provider
Returns:
Object of type :class:`cpl.core.dependency_injection.service_provider_abc.ServiceProviderABC`
"""

View File

@@ -1,3 +1,2 @@
from .application_environment_abc import ApplicationEnvironmentABC from .environment_enum import EnvironmentEnum
from .environment_name_enum import EnvironmentNameEnum from .environment import Environment
from .application_environment import ApplicationEnvironment

View File

@@ -1,95 +0,0 @@
import os
from datetime import datetime
from socket import gethostname
from typing import Optional
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.environment.environment_name_enum import EnvironmentNameEnum
class ApplicationEnvironment(ApplicationEnvironmentABC):
r"""Represents environment of the application
Parameter:
name: :class:`cpl.core.environment.environment_name_enum.EnvironmentNameEnum`
"""
def __init__(self, name: EnvironmentNameEnum = EnvironmentNameEnum.production):
ApplicationEnvironmentABC.__init__(self)
self._environment_name: Optional[EnvironmentNameEnum] = name
self._app_name: Optional[str] = None
self._customer: Optional[str] = None
self._start_time: datetime = datetime.now()
self._end_time: datetime = datetime.now()
self._runtime_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self._working_directory = os.getcwd()
@property
def environment_name(self) -> str:
return str(self._environment_name.value)
@environment_name.setter
def environment_name(self, environment_name: str):
self._environment_name = EnvironmentNameEnum(environment_name)
@property
def application_name(self) -> str:
return self._app_name if self._app_name is not None else ""
@application_name.setter
def application_name(self, application_name: str):
self._app_name = application_name
@property
def customer(self) -> str:
return self._customer if self._customer is not None else ""
@customer.setter
def customer(self, customer: str):
self._customer = customer
@property
def host_name(self):
return gethostname()
@property
def start_time(self) -> datetime:
return self._start_time
@property
def end_time(self) -> datetime:
return self._end_time
@end_time.setter
def end_time(self, end_time: datetime):
self._end_time = end_time
@property
def date_time_now(self) -> datetime:
return datetime.now()
@property
def working_directory(self) -> str:
return str(self._working_directory)
@property
def runtime_directory(self) -> str:
return str(self._runtime_directory)
def set_runtime_directory(self, runtime_directory: str):
if runtime_directory != "":
self._runtime_directory = runtime_directory
return
self._runtime_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def set_working_directory(self, working_directory: str):
if working_directory != "":
self._working_directory = working_directory
os.chdir(self._working_directory)
return
self._working_directory = os.path.abspath("./")
os.chdir(self._working_directory)

View File

@@ -1,98 +0,0 @@
from abc import ABC, abstractmethod
from datetime import datetime
class ApplicationEnvironmentABC(ABC):
r"""ABC of the class :class:`cpl.core.environment.application_environment.ApplicationEnvironment`"""
@abstractmethod
def __init__(self):
pass
@property
@abstractmethod
def environment_name(self) -> str:
pass
@environment_name.setter
@abstractmethod
def environment_name(self, environment_name: str):
pass
@property
@abstractmethod
def application_name(self) -> str:
pass
@application_name.setter
@abstractmethod
def application_name(self, application_name: str):
pass
@property
@abstractmethod
def customer(self) -> str:
pass
@customer.setter
@abstractmethod
def customer(self, customer: str):
pass
@property
@abstractmethod
def host_name(self) -> str:
pass
@property
@abstractmethod
def start_time(self) -> datetime:
pass
@start_time.setter
@abstractmethod
def start_time(self, start_time: datetime):
pass
@property
@abstractmethod
def end_time(self):
pass
@end_time.setter
@abstractmethod
def end_time(self, end_time: datetime):
pass
@property
@abstractmethod
def date_time_now(self) -> datetime:
pass
@property
@abstractmethod
def working_directory(self) -> str:
pass
@property
@abstractmethod
def runtime_directory(self) -> str:
pass
@abstractmethod
def set_runtime_directory(self, runtime_directory: str):
r"""Sets the current runtime directory
Parameter:
runtime_directory: :class:`str`
Path of the runtime directory
"""
@abstractmethod
def set_working_directory(self, working_directory: str):
r"""Sets the current working directory
Parameter:
working_directory: :class:`str`
Path of the current working directory
"""

View File

@@ -1,32 +1,68 @@
import os import os
from typing import Optional, Type from socket import gethostname
from typing import Type
from cpl.core.typing import T from cpl.core.environment.environment_enum import EnvironmentEnum
from cpl.core.typing import T, D
from cpl.core.utils.get_value import get_value from cpl.core.utils.get_value import get_value
class Environment: class Environment:
_environment = "production" r"""Represents environment of the application
Parameter:
name: :class:`cpl.core.environment.environment_name_enum.EnvironmentNameEnum`
"""
@classmethod @classmethod
def get_environment(cls): def get_environment(cls):
return cls._environment return cls.get("ENVIRONMENT", str, EnvironmentEnum.production.value)
@classmethod @classmethod
def set_environment(cls, environment: str): def set_environment(cls, environment: str):
if environment not in ["development", "staging", "production"]: assert environment is not None and environment != "", "environment must not be None or empty"
raise ValueError("Invalid environment") assert environment.lower() in [
Environment._environment = environment e.value for e in EnvironmentEnum
], f"environment must be one of {[e.value for e in EnvironmentEnum]}"
cls.set("ENVIRONMENT", environment.lower())
@classmethod
def get_app_name(cls) -> str:
return cls.get("APP_NAME", str)
@classmethod
def set_app_name(cls, app_name: str):
cls.set("APP_NAME", app_name)
@staticmethod @staticmethod
def get(key: str, cast_type: Type[T], default: Optional[T] = None) -> Optional[T]: def get_host_name() -> str:
return gethostname()
@staticmethod
def get_cwd() -> str:
return os.getcwd()
@staticmethod
def set_cwd(working_directory: str):
assert working_directory is not None and working_directory != "", "working_directory must not be None or empty"
os.chdir(working_directory)
@staticmethod
def set(key: str, value: T):
assert key is not None and key != "", "key must not be None or empty"
os.environ[key] = str(value)
@staticmethod
def get(key: str, cast_type: Type[T], default: D = None) -> T | D:
""" """
Get an environment variable and cast it to a specified type. Get an environment variable and cast it to a specified type.
:param str key: The name of the environment variable. :param str key: The name of the environment variable.
:param Type[T] cast_type: A callable to cast the variable's value. :param Type[T] cast_type: A callable to cast the variable's value.
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None. :param T default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
:return: The casted value, or None if the variable is not found. :return: The casted value, or None if the variable is not found.
:rtype: Optional[T] :rtype: T | D
""" """
return get_value(dict(os.environ), key, cast_type, default) return get_value(dict(os.environ), key, cast_type, default)

View File

@@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
class EnvironmentNameEnum(Enum): class EnvironmentEnum(Enum):
production = "production" production = "production"
staging = "staging" staging = "staging"
testing = "testing" testing = "testing"

View File

@@ -1,4 +1,4 @@
from .logger import Logger from .logger import Logger
from .logger_abc import LoggerABC from .logger_abc import LoggerABC
from .log_level_enum import LogLevelEnum from .log_level_enum import LogLevel
from .logging_settings import LogSettings from .logging_settings import LogSettings

View File

@@ -1,94 +0,0 @@
import multiprocessing
import os
from datetime import datetime
from typing import Self
from cpl.core.console import Console
from cpl.core.log.log_level_enum import LogLevelEnum
class LogWriter:
_instance = None
# ANSI color codes for different log levels
_COLORS = {
LogLevelEnum.trace: "\033[37m", # Light Gray
LogLevelEnum.debug: "\033[94m", # Blue
LogLevelEnum.info: "\033[92m", # Green
LogLevelEnum.warning: "\033[93m", # Yellow
LogLevelEnum.error: "\033[91m", # Red
LogLevelEnum.fatal: "\033[95m", # Magenta
}
def __init__(self, file_prefix: str, level: LogLevelEnum = LogLevelEnum.info):
self._file_prefix = file_prefix
self._level = level
self._queue = multiprocessing.Queue()
self._process = multiprocessing.Process(target=self._log_worker, daemon=True)
self._create_log_dir()
self._process.start()
@property
def level(self) -> LogLevelEnum:
return self._level
@level.setter
def level(self, value: LogLevelEnum):
assert isinstance(value, LogLevelEnum), "Log level must be an instance of LogLevelEnum"
self._level = value
@classmethod
def get_instance(cls, file_prefix: str, level: LogLevelEnum = LogLevelEnum.info) -> Self:
if cls._instance is None:
cls._instance = LogWriter(file_prefix, level)
return cls._instance
@staticmethod
def _create_log_dir():
if os.path.exists("logs"):
return
os.makedirs("logs")
def _log_worker(self):
"""Worker process that writes log messages from the queue to the file."""
while True:
content = self._queue.get()
if content is None: # Shutdown signal
break
self._write_log_to_file(content)
Console.write_line(
f"{self._COLORS.get(self._level, '\033[0m')}{content}\033[0m"
)
@property
def log_file(self):
return f"logs/{self._file_prefix}_{datetime.now().strftime('%Y-%m-%d')}.log"
def _ensure_file_size(self):
log_file = self.log_file
if not os.path.exists(log_file) or os.path.getsize(log_file) <= 0.5 * 1024 * 1024:
return
# if exists and size is greater than 300MB, create a new file
os.rename(
log_file,
f"{log_file.split('.log')[0]}_{datetime.now().strftime('%H-%M-%S')}.log",
)
def _write_log_to_file(self, content: str):
self._ensure_file_size()
with open(self.log_file, "a") as log_file:
log_file.write(content + "\n")
log_file.close()
def log(self, content: str):
"""Enqueue log message without blocking main app."""
self._queue.put(content)
def close(self):
"""Gracefully stop the logging process."""
self._queue.put(None)
self._process.join()

View File

@@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
class LogLevelEnum(Enum): class LogLevel(Enum):
off = "OFF" # Nothing off = "OFF" # Nothing
trace = "TRC" # Detailed app information's trace = "TRC" # Detailed app information's
debug = "DEB" # Detailed app state debug = "DEB" # Detailed app state

Some files were not shown because too many files have changed in this diff Show More