Compare commits

..

7 Commits

Author SHA1 Message Date
9c6078f4fd with_logging & logger level fix
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 18s
Build on push / dependency (push) Successful in 14s
Build on push / translation (push) Successful in 15s
Build on push / database (push) Successful in 17s
Build on push / application (push) Successful in 18s
Build on push / mail (push) Successful in 20s
Build on push / auth (push) Successful in 18s
2025-09-17 22:18:38 +02:00
dfdc31512d App with extension functions
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 17s
Build on push / query (push) Successful in 17s
Build on push / dependency (push) Successful in 14s
Build on push / translation (push) Successful in 14s
Build on push / database (push) Successful in 18s
Build on push / mail (push) Successful in 19s
Build on push / application (push) Successful in 22s
Build on push / auth (push) Successful in 14s
2025-09-17 21:56:47 +02:00
ab7ff7da93 Made startup/app extensions static 2025-09-17 20:54:21 +02:00
41087a838b Removed pass from empty functions
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 17s
Build on push / query (push) Successful in 17s
Build on push / dependency (push) Successful in 17s
Build on push / translation (push) Successful in 14s
Build on push / mail (push) Successful in 18s
Build on push / database (push) Successful in 18s
Build on push / application (push) Successful in 24s
Build on push / auth (push) Successful in 14s
2025-09-17 20:49:15 +02:00
836b92ccbf Further console test 2025-09-17 20:44:25 +02:00
8aaba22940 Improved application structure
All checks were successful
Build on push / prepare (push) Successful in 11s
Build on push / core (push) Successful in 22s
Build on push / query (push) Successful in 22s
Build on push / dependency (push) Successful in 20s
Build on push / database (push) Successful in 20s
Build on push / translation (push) Successful in 20s
Build on push / application (push) Successful in 22s
Build on push / mail (push) Successful in 23s
Build on push / auth (push) Successful in 16s
2025-09-17 19:23:14 +02:00
504dc5e188 Added auth & improved database
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Successful in 18s
Build on push / core (push) Successful in 24s
Build on push / dependency (push) Successful in 17s
Build on push / database (push) Successful in 15s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 18s
Build on push / application (push) Successful in 19s
Build on push / auth (push) Successful in 16s
2025-09-17 12:21:32 +02:00
139 changed files with 2270 additions and 958 deletions

View File

@@ -12,6 +12,20 @@ jobs:
version_suffix: 'dev'
secrets: inherit
application:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-application
secrets: inherit
auth:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency, database ]
with:
working_directory: src/cpl-auth
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
@@ -19,6 +33,27 @@ jobs:
working_directory: src/cpl-core
secrets: inherit
database:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-database
secrets: inherit
dependency:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-dependency
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-mail
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
@@ -28,14 +63,7 @@ jobs:
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-translation
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-mail
secrets: inherit

View File

@@ -1,6 +1 @@
from .application_abc import ApplicationABC
from .application_builder import ApplicationBuilder
from .application_builder_abc import ApplicationBuilderABC
from .application_extension_abc import ApplicationExtensionABC
from .startup_abc import StartupABC
from .startup_extension_abc import StartupExtensionABC

View File

@@ -0,0 +1,4 @@
from .application_abc import ApplicationABC
from .application_extension_abc import ApplicationExtensionABC
from .startup_abc import StartupABC
from .startup_extension_abc import StartupExtensionABC

View File

@@ -0,0 +1,82 @@
from abc import ABC, abstractmethod
from typing import Callable, Self
from cpl.application.host import Host
from cpl.core.console.console import Console
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC, LogLevel
from cpl.dependency.service_provider_abc import ServiceProviderABC
def __not_implemented__(package: str, func: Callable):
raise NotImplementedError(f"Package {package} is required to use {func.__name__} method")
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
services: :class:`cpl.dependency.service_provider_abc.ServiceProviderABC`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(self, services: ServiceProviderABC):
self._services = services
@classmethod
def extend(cls, name: str | Callable, func: Callable[[Self], Self]):
r"""Extend the Application with a custom method
Parameters:
name: :class:`str`
Name of the method
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
if callable(name):
name = name.__name__
setattr(cls, name, func)
return cls
def with_logging(self, level: LogLevel = None):
if level is None:
level = Environment.get("LOG_LEVEL", LogLevel, LogLevel.info)
logger = self._services.get_service(LoggerABC)
logger.set_level(level)
def with_permissions(self, *args, **kwargs):
__not_implemented__("cpl-auth", self.with_permissions)
def with_migrations(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_migrations)
def with_seeders(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_seeders)
def with_extension(self, func: Callable[[Self, ...], None], *args, **kwargs):
r"""Extend the Application with a custom method
Parameters:
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
assert func is not None, "func must not be None"
assert callable(func), "func must be callable"
func(self, *args, **kwargs)
def run(self):
r"""Entry point
Called by custom Application.main
"""
try:
Host.run(self.main)
except KeyboardInterrupt:
Console.close()
@abstractmethod
def main(self): ...

View File

@@ -0,0 +1,10 @@
from abc import ABC, abstractmethod
from cpl.dependency import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@staticmethod
@abstractmethod
def run(services: ServiceProviderABC): ...

View File

@@ -3,19 +3,17 @@ from abc import ABC, abstractmethod
from cpl.dependency.service_collection import ServiceCollection
class AsyncStartupABC(ABC):
class StartupABC(ABC):
r"""ABC for the startup class"""
@staticmethod
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self):
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
async def configure_services(self, service: ServiceCollection):
def configure_services(service: ServiceCollection):
r"""Creates service provider
Parameter:

View File

@@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
from cpl.dependency import ServiceCollection
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@staticmethod
@abstractmethod
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
def configure_services(services: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -1,57 +0,0 @@
from abc import ABC, abstractmethod
from typing import Optional
from cpl.dependency.service_provider_abc import ServiceProviderABC
from cpl.core.console.console import Console
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
Contains object loaded from appsettings
services: :class:`cpl.dependency.service_provider_abc.ServiceProviderABC`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(self, services: ServiceProviderABC):
self._services: Optional[ServiceProviderABC] = services
def run(self):
r"""Entry point
Called by custom Application.main
"""
try:
self.configure()
self.main()
except KeyboardInterrupt:
Console.close()
async def run_async(self):
r"""Entry point
Called by custom Application.main
"""
try:
await self.configure()
await self.main()
except KeyboardInterrupt:
Console.close()
@abstractmethod
def configure(self):
r"""Configure the application
Called by :class:`cpl.application.application_abc.ApplicationABC.run`
"""
@abstractmethod
def main(self):
r"""Custom entry point
Called by :class:`cpl.application.application_abc.ApplicationABC.run`
"""

View File

@@ -1,97 +1,64 @@
from typing import Type, Optional, Callable, Union
import asyncio
from typing import Type, Optional
from cpl.application.application_abc import ApplicationABC
from cpl.application.application_builder_abc import ApplicationBuilderABC
from cpl.application.application_extension_abc import ApplicationExtensionABC
from cpl.application.async_application_extension_abc import AsyncApplicationExtensionABC
from cpl.application.async_startup_abc import AsyncStartupABC
from cpl.application.async_startup_extension_abc import AsyncStartupExtensionABC
from cpl.application.startup_abc import StartupABC
from cpl.application.startup_extension_abc import StartupExtensionABC
from cpl.core.configuration.configuration import Configuration
from cpl.application.abc.application_abc import ApplicationABC
from cpl.application.abc.application_extension_abc import ApplicationExtensionABC
from cpl.application.abc.startup_abc import StartupABC
from cpl.application.abc.startup_extension_abc import StartupExtensionABC
from cpl.application.host import Host
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class ApplicationBuilder(ApplicationBuilderABC):
r"""This is class is used to build an object of :class:`cpl.application.application_abc.ApplicationABC`
Parameter:
app: Type[:class:`cpl.application.application_abc.ApplicationABC`]
Application to build
"""
class ApplicationBuilder:
r"""A builder for constructing an application with configurable services and extensions."""
def __init__(self, app: Type[ApplicationABC]):
ApplicationBuilderABC.__init__(self)
self._app = app
self._startup: Optional[StartupABC | AsyncStartupABC] = None
assert app is not None, "app must not be None"
assert issubclass(app, ApplicationABC), "app must be an subclass of ApplicationABC or its subclass"
self._app = app if app is not None else ApplicationABC
self._services = ServiceCollection()
self._app_extensions: list[Type[ApplicationExtensionABC | AsyncApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC | AsyncStartupABC]] = []
self._startup: Optional[StartupABC] = None
self._app_extensions: list[Type[ApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC]] = []
def use_startup(self, startup: Type[StartupABC | AsyncStartupABC]) -> "ApplicationBuilder":
self._startup = startup()
self._async_loop = asyncio.get_event_loop()
@property
def services(self) -> ServiceCollection:
return self._services
@property
def service_provider(self):
return self._services.build()
def with_startup(self, startup: Type[StartupABC]) -> "ApplicationBuilder":
self._startup = startup
return self
def use_extension(
def with_extension(
self,
extension: Type[
ApplicationExtensionABC | AsyncApplicationExtensionABC | StartupExtensionABC | AsyncStartupExtensionABC
],
extension: Type[ApplicationExtensionABC | StartupExtensionABC],
) -> "ApplicationBuilder":
if (
issubclass(extension, ApplicationExtensionABC) or issubclass(extension, AsyncApplicationExtensionABC)
) and extension not in self._app_extensions:
if (issubclass(extension, ApplicationExtensionABC)) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (
issubclass(extension, StartupExtensionABC) or issubclass(extension, AsyncStartupExtensionABC)
) and extension not in self._startup_extensions:
elif (issubclass(extension, StartupExtensionABC)) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def _build_startup(self):
for ex in self._startup_extensions:
extension = ex()
extension.configure_configuration(Configuration, Environment)
extension.configure_services(self._services, Environment)
if self._startup is not None:
self._startup.configure_configuration(Configuration, Environment)
self._startup.configure_services(self._services, Environment)
async def _build_async_startup(self):
for ex in self._startup_extensions:
extension = ex()
await extension.configure_configuration(Configuration, Environment)
await extension.configure_services(self._services, Environment)
if self._startup is not None:
await self._startup.configure_configuration(Configuration, Environment)
await self._startup.configure_services(self._services, Environment)
def build(self) -> ApplicationABC:
self._build_startup()
for extension in self._startup_extensions:
Host.run(extension.configure_configuration)
Host.run(extension.configure_services, self._services)
config = Configuration
services = self._services.build_service_provider()
if self._startup is not None:
Host.run(self._startup.configure_configuration)
Host.run(self._startup.configure_services, self._services)
for ex in self._app_extensions:
extension = ex()
extension.run(config, services)
for extension in self._app_extensions:
Host.run(extension.run, self.service_provider)
return self._app(services)
async def build_async(self) -> ApplicationABC:
await self._build_async_startup()
config = Configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
await extension.run(config, services)
return self._app(services)
return self._app(self.service_provider)

View File

@@ -1,47 +0,0 @@
from abc import ABC, abstractmethod
from typing import Type
from cpl.application.application_abc import ApplicationABC
from cpl.application.startup_abc import StartupABC
class ApplicationBuilderABC(ABC):
r"""ABC for the :class:`cpl.application.application_builder.ApplicationBuilder`"""
@abstractmethod
def __init__(self, *args):
pass
@abstractmethod
def use_startup(self, startup: Type[StartupABC]):
r"""Sets the custom startup class to use
Parameter:
startup: Type[:class:`cpl.application.startup_abc.StartupABC`]
Startup class to use
"""
@abstractmethod
async def use_startup(self, startup: Type[StartupABC]):
r"""Sets the custom startup class to use async
Parameter:
startup: Type[:class:`cpl.application.startup_abc.StartupABC`]
Startup class to use
"""
@abstractmethod
def build(self) -> ApplicationABC:
r"""Creates custom application object
Returns:
Object of :class:`cpl.application.application_abc.ApplicationABC`
"""
@abstractmethod
async def build_async(self) -> ApplicationABC:
r"""Creates custom application object async
Returns:
Object of :class:`cpl.application.application_abc.ApplicationABC`
"""

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class AsyncApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -1,31 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class AsyncStartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.Configuration`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
async def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -0,0 +1,17 @@
import asyncio
from typing import Callable
class Host:
_loop = asyncio.get_event_loop()
@classmethod
def get_loop(cls):
return cls._loop
@classmethod
def run(cls, func: Callable, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return cls._loop.run_until_complete(func(*args, **kwargs))
return func(*args, **kwargs)

View File

@@ -1,31 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class StartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,33 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -0,0 +1,82 @@
from enum import Enum
from typing import Type
from cpl.application.abc import ApplicationABC as _ApplicationABC
from cpl.auth import permission as _permission
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin as _KeycloakAdmin
from cpl.auth.keycloak.keycloak_client import KeycloakClient as _KeycloakClient
from cpl.dependency.service_collection import ServiceCollection as _ServiceCollection
from .auth_logger import AuthLogger
from .keycloak_settings import KeycloakSettings
from .permission_seeder import PermissionSeeder
def _with_permissions(self: _ApplicationABC, *permissions: Type[Enum]) -> _ApplicationABC:
from cpl.auth.permission.permissions_registry import PermissionsRegistry
for perm in permissions:
PermissionsRegistry.with_enum(perm)
return self
def _add_daos(collection: _ServiceCollection):
from .schema._administration.auth_user_dao import AuthUserDao
from .schema._administration.api_key_dao import ApiKeyDao
from .schema._permission.api_key_permission_dao import ApiKeyPermissionDao
from .schema._permission.permission_dao import PermissionDao
from .schema._permission.role_dao import RoleDao
from .schema._permission.role_permission_dao import RolePermissionDao
from .schema._permission.role_user_dao import RoleUserDao
collection.add_singleton(AuthUserDao)
collection.add_singleton(ApiKeyDao)
collection.add_singleton(ApiKeyPermissionDao)
collection.add_singleton(PermissionDao)
collection.add_singleton(RoleDao)
collection.add_singleton(RolePermissionDao)
collection.add_singleton(RoleUserDao)
def add_auth(collection: _ServiceCollection):
import os
from cpl.core.console import Console
from cpl.database.service.migration_service import MigrationService
from cpl.database.model.server_type import ServerType, ServerTypes
try:
collection.add_singleton(_KeycloakClient)
collection.add_singleton(_KeycloakAdmin)
_add_daos(collection)
provider = collection.build()
migration_service: MigrationService = provider.get_service(MigrationService)
if ServerType.server_type == ServerTypes.POSTGRES:
migration_service.with_directory(
os.path.join(os.path.dirname(os.path.realpath(__file__)), "scripts/postgres")
)
elif ServerType.server_type == ServerTypes.MYSQL:
migration_service.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "scripts/mysql"))
except ImportError as e:
Console.error("cpl-auth is not installed", str(e))
def add_permission(collection: _ServiceCollection):
from cpl.auth.permission_seeder import PermissionSeeder
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.auth.permission.permissions import Permissions
try:
collection.add_singleton(DataSeederABC, PermissionSeeder)
PermissionsRegistry.with_enum(Permissions)
except ImportError as e:
from cpl.core.console import Console
Console.error("cpl-auth is not installed", str(e))
_ServiceCollection.with_module(add_auth, __name__)
_ServiceCollection.with_module(add_permission, _permission.__name__)
_ApplicationABC.extend(_ApplicationABC.with_permissions, _with_permissions)

View File

@@ -0,0 +1,8 @@
from cpl.core.log import Logger
from cpl.core.typing import Source
class AuthLogger(Logger):
def __init__(self, source: Source):
Logger.__init__(self, source, "auth")

View File

@@ -0,0 +1,3 @@
from .keycloak_admin import KeycloakAdmin
from .keycloak_client import KeycloakClient
from .keycloak_user import KeycloakUser

View File

@@ -0,0 +1,24 @@
from keycloak import KeycloakAdmin as _KeycloakAdmin, KeycloakOpenIDConnection
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.keycloak_settings import KeycloakSettings
_logger = AuthLogger("keycloak")
class KeycloakAdmin(_KeycloakAdmin):
def __init__(self, settings: KeycloakSettings):
_logger.info("Initializing Keycloak admin")
_connection = KeycloakOpenIDConnection(
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
_KeycloakAdmin.__init__(
self,
connection=_connection,
)
self.__connection = _connection

View File

@@ -0,0 +1,26 @@
from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.keycloak_settings import KeycloakSettings
_logger = AuthLogger("keycloak")
class KeycloakClient(KeycloakOpenID):
def __init__(self, settings: KeycloakSettings):
KeycloakOpenID.__init__(
self,
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
_logger.info("Initializing Keycloak client")
connection = KeycloakOpenIDConnection(
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
self._admin = KeycloakAdmin(connection=connection)

View File

@@ -0,0 +1,36 @@
from cpl.core.utils.get_value import get_value
from cpl.dependency import ServiceProviderABC
class KeycloakUser:
def __init__(self, source: dict):
self._username = get_value(source, "preferred_username", str)
self._email = get_value(source, "email", str)
self._email_verified = get_value(source, "email_verified", bool)
self._name = get_value(source, "name", str)
@property
def username(self) -> str:
return self._username
@property
def email(self) -> str:
return self._email
@property
def email_verified(self) -> bool:
return self._email_verified
@property
def name(self) -> str:
return self._name
# Attrs from keycloak
@property
def id(self) -> str:
from cpl.auth import KeycloakAdmin
keycloak_admin: KeycloakAdmin = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak_admin.get_user_id(self._username)

View File

@@ -0,0 +1,37 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.environment import Environment
class KeycloakSettings(ConfigurationModelABC):
def __init__(
self,
url: str = Environment.get("KEYCLOAK_URL", str),
client_id: str = Environment.get("KEYCLOAK_CLIENT_ID", str),
realm: str = Environment.get("KEYCLOAK_REALM", str),
client_secret: str = Environment.get("KEYCLOAK_CLIENT_SECRET", str),
):
ConfigurationModelABC.__init__(self)
self._url: Optional[str] = url
self._client_id: Optional[str] = client_id
self._realm: Optional[str] = realm
self._client_secret: Optional[str] = client_secret
@property
def url(self) -> Optional[str]:
return self._url
@property
def client_id(self) -> Optional[str]:
return self._client_id
@property
def realm(self) -> Optional[str]:
return self._realm
@property
def client_secret(self) -> Optional[str]:
return self._client_secret

View File

@@ -0,0 +1,36 @@
from enum import Enum
class Permissions(Enum):
""" """
"""
Administration
"""
# administrator
administrator = "administrator"
# api keys
api_keys = "api_keys"
api_keys_create = "api_keys.create"
api_keys_update = "api_keys.update"
api_keys_delete = "api_keys.delete"
# users
users = "users"
users_create = "users.create"
users_update = "users.update"
users_delete = "users.delete"
# settings
settings = "settings"
settings_update = "settings.update"
"""
Permissions
"""
# roles
roles = "roles"
roles_create = "roles.create"
roles_update = "roles.update"
roles_delete = "roles.delete"

View File

@@ -0,0 +1,24 @@
from enum import Enum
from typing import Type
class PermissionsRegistry:
_permissions: dict[str, str] = {}
@classmethod
def get(cls):
return cls._permissions.keys()
@classmethod
def descriptions(cls):
return {x: cls._permissions[x] for x in cls._permissions if cls._permissions[x] is not None}
@classmethod
def set(cls, permission: str, description: str = None):
cls._permissions[permission] = description
@classmethod
def with_enum(cls, e: Type[Enum]):
perms = [x.value for x in e]
for perm in perms:
cls.set(str(perm))

View File

@@ -0,0 +1,120 @@
from cpl.auth.permission.permissions import Permissions
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.auth.schema import (
Permission,
Role,
RolePermission,
ApiKey,
ApiKeyPermission,
PermissionDao,
RoleDao,
RolePermissionDao,
ApiKeyDao,
ApiKeyPermissionDao,
)
from cpl.core.utils.get_value import get_value
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class PermissionSeeder(DataSeederABC):
def __init__(
self,
permission_dao: PermissionDao,
role_dao: RoleDao,
role_permission_dao: RolePermissionDao,
api_key_dao: ApiKeyDao,
api_key_permission_dao: ApiKeyPermissionDao,
):
DataSeederABC.__init__(self)
self._permission_dao = permission_dao
self._role_dao = role_dao
self._role_permission_dao = role_permission_dao
self._api_key_dao = api_key_dao
self._api_key_permission_dao = api_key_permission_dao
async def seed(self):
permissions = await self._permission_dao.get_all()
possible_permissions = [permission for permission in PermissionsRegistry.get()]
if len(permissions) == len(possible_permissions):
_logger.info("Permissions already existing")
await self._update_missing_descriptions()
return
to_delete = []
for permission in permissions:
if permission.name in possible_permissions:
continue
to_delete.append(permission)
await self._permission_dao.delete_many(to_delete, hard_delete=True)
_logger.warning("Permissions incomplete")
permission_names = [permission.name for permission in permissions]
await self._permission_dao.create_many(
[
Permission(
0,
permission,
get_value(PermissionsRegistry.descriptions(), permission, str),
)
for permission in possible_permissions
if permission not in permission_names
]
)
await self._update_missing_descriptions()
await self._add_missing_to_role()
await self._add_missing_to_api_key()
async def _add_missing_to_role(self):
admin_role = await self._role_dao.find_single_by([{Role.id: 1}, {Role.name: "admin"}])
if admin_role is None:
return
admin_permissions = await self._role_permission_dao.get_by_role_id(admin_role.id, with_deleted=True)
to_assign = [
RolePermission(0, admin_role.id, permission.id)
for permission in await self._permission_dao.get_all()
if permission.id not in [x.permission_id for x in admin_permissions]
]
await self._role_permission_dao.create_many(to_assign)
async def _add_missing_to_api_key(self):
admin_api_key = await self._api_key_dao.find_single_by([{ApiKey.id: 1}, {ApiKey.identifier: "admin"}])
if admin_api_key is None:
return
admin_permissions = await self._api_key_permission_dao.find_by_api_key_id(admin_api_key.id, with_deleted=True)
to_assign = [
ApiKeyPermission(0, admin_api_key.id, permission.id)
for permission in await self._permission_dao.get_all()
if permission.id not in [x.permission_id for x in admin_permissions]
]
await self._api_key_permission_dao.create_many(to_assign)
async def _update_missing_descriptions(self):
permissions = {
permission.name: permission
for permission in await self._permission_dao.find_by([{Permission.description: None}])
}
to_update = []
if len(permissions) == 0:
return
for key in PermissionsRegistry.descriptions():
if key.value not in permissions:
continue
permissions[key.value].description = PermissionsRegistry.descriptions()[key]
to_update.append(permissions[key.value])
if len(to_update) == 0:
return
await self._permission_dao.update_many(to_update)

View File

@@ -0,0 +1,15 @@
from ._administration.api_key import ApiKey
from ._administration.api_key_dao import ApiKeyDao
from ._administration.auth_user import AuthUser
from ._administration.auth_user_dao import AuthUserDao
from ._permission.api_key_permission import ApiKeyPermission
from ._permission.api_key_permission_dao import ApiKeyPermissionDao
from ._permission.permission import Permission
from ._permission.permission_dao import PermissionDao
from ._permission.role import Role
from ._permission.role_dao import RoleDao
from ._permission.role_permission import RolePermission
from ._permission.role_permission_dao import RolePermissionDao
from ._permission.role_user import RoleUser
from ._permission.role_user_dao import RoleUserDao

View File

@@ -0,0 +1,59 @@
import secrets
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.auth.permission.permissions import Permissions
from cpl.core.environment import Environment
from cpl.core.log import Logger
from cpl.core.typing import SerialId, Id
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
_logger = Logger(__name__)
class ApiKey(DbModelABC):
def __init__(
self,
id: SerialId,
identifier: str,
key: str,
deleted: bool = False,
editor_id: Optional[Id] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._identifier = identifier
self._key = key
@property
def identifier(self) -> str:
return self._identifier
@property
def key(self) -> str:
return self._key
@async_property
async def permissions(self):
from cpl.auth.schema._permission.api_key_permission_dao import ApiKeyPermissionDao
api_key_permission_dao: ApiKeyPermissionDao = ServiceProviderABC.get_global_service(ApiKeyPermissionDao)
return [await x.permission for x in await api_key_permission_dao.find_by_api_key_id(self.id)]
async def has_permission(self, permission: Permissions) -> bool:
return permission.value in [x.name for x in await self.permissions]
def set_new_api_key(self):
self._key = self.new_key()
@staticmethod
def new_key() -> str:
return f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}"
@classmethod
def new(cls, identifier: str) -> "ApiKey":
return ApiKey(0, identifier, cls.new_key())

View File

@@ -0,0 +1,32 @@
from typing import Optional
from cpl.auth.schema._administration.api_key import ApiKey
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class ApiKeyDao(DbModelDaoABC[ApiKey]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, ApiKey, TableManager.get("api_keys"))
self.attribute(ApiKey.identifier, str)
self.attribute(ApiKey.key, str, "keystring")
async def get_by_identifier(self, ident: str) -> ApiKey:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Identifier = '{ident}'")
return self.to_object(result[0])
async def get_by_key(self, key: str) -> ApiKey:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Keystring = '{key}'")
return self.to_object(result[0])
async def find_by_key(self, key: str) -> Optional[ApiKey]:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Keystring = '{key}'")
if not result or len(result) == 0:
return None
return self.to_object(result[0])

View File

@@ -0,0 +1,89 @@
import uuid
from datetime import datetime
from typing import Optional
from async_property import async_property
from keycloak import KeycloakGetError
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
_logger = AuthLogger(__name__)
class AuthUser(DbModelABC):
def __init__(
self,
id: SerialId,
keycloak_id: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._keycloak_id = keycloak_id
@property
def keycloak_id(self) -> str:
return self._keycloak_id
@property
def username(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak_admin: KeycloakAdmin = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak_admin.get_user(self._keycloak_id).get("username")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
_logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@property
def email(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak_admin: KeycloakAdmin = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak_admin.get_user(self._keycloak_id).get("email")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
_logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@async_property
async def roles(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = ServiceProviderABC.get_global_service(RoleUserDao)
return [await x.role for x in await role_user_dao.get_by_user_id(self.id)]
@async_property
async def permissions(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.get_permissions(self.id)
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.has_permission(self.id, permission)
async def anonymize(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
self._keycloak_id = str(uuid.UUID(int=0))
await auth_user_dao.update(self)

View File

@@ -0,0 +1,72 @@
from typing import Optional, Union
from cpl.auth.permission.permissions import Permissions
from cpl.auth.schema._administration.auth_user import AuthUser
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
from cpl.database.external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.dependency import ServiceProviderABC
_logger = DBLogger(__name__)
class AuthUserDao(DbModelDaoABC[AuthUser]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, AuthUser, TableManager.get("auth_users"))
self.attribute(AuthUser.keycloak_id, str, aliases=["keycloakId"])
async def get_users():
return [(x.id, x.username, x.email) for x in await self.get_all()]
self.use_external_fields(
ExternalDataTempTableBuilder()
.with_table_name(self._table_name)
.with_field("id", "int", True)
.with_field("username", "text")
.with_field("email", "text")
.with_value_getter(get_users)
)
async def get_by_keycloak_id(self, keycloak_id: str) -> AuthUser:
return await self.get_single_by({AuthUser.keycloak_id: keycloak_id})
async def find_by_keycloak_id(self, keycloak_id: str) -> Optional[AuthUser]:
return await self.find_single_by({AuthUser.keycloak_id: keycloak_id})
async def has_permission(self, user_id: int, permission: Union[Permissions, str]) -> bool:
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
p = await permission_dao.get_by_name(permission if isinstance(permission, str) else permission.value)
result = await self._db.select_map(
f"""
SELECT COUNT(*)
FROM permission.role_users ru
JOIN permission.role_permissions rp ON ru.roleId = rp.roleId
WHERE ru.userId = {user_id}
AND rp.permissionId = {p.id}
AND ru.deleted = FALSE
AND rp.deleted = FALSE;
"""
)
if result is None or len(result) == 0:
return False
return result[0]["count"] > 0
async def get_permissions(self, user_id: int) -> list[Permissions]:
result = await self._db.select_map(
f"""
SELECT p.*
FROM permission.permissions p
JOIN permission.role_permissions rp ON p.id = rp.permissionId
JOIN permission.role_users ru ON rp.roleId = ru.roleId
WHERE ru.userId = {user_id}
AND rp.deleted = FALSE
AND ru.deleted = FALSE;
"""
)
return [Permissions(p["name"]) for p in result]

View File

@@ -0,0 +1,46 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import ServiceProviderABC
class ApiKeyPermission(DbJoinModelABC):
def __init__(
self,
id: SerialId,
api_key_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbJoinModelABC.__init__(self, api_key_id, permission_id, id, deleted, editor_id, created, updated)
self._api_key_id = api_key_id
self._permission_id = permission_id
@property
def api_key_id(self) -> int:
return self._api_key_id
@async_property
async def api_key(self):
from cpl.auth.schema._administration.api_key_dao import ApiKeyDao
api_key_dao: ApiKeyDao = ServiceProviderABC.get_global_service(ApiKeyDao)
return await api_key_dao.get_by_id(self._api_key_id)
@property
def permission_id(self) -> int:
return self._permission_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
return await permission_dao.get_by_id(self._permission_id)

View File

@@ -0,0 +1,29 @@
from cpl.auth.schema._permission.api_key_permission import ApiKeyPermission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class ApiKeyPermissionDao(DbModelDaoABC[ApiKeyPermission]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, ApiKeyPermission, TableManager.get("api_key_permissions"))
self.attribute(ApiKeyPermission.api_key_id, int)
self.attribute(ApiKeyPermission.permission_id, int)
async def find_by_api_key_id(self, api_key_id: int, with_deleted=False) -> list[ApiKeyPermission]:
f = [{ApiKeyPermission.api_key_id: api_key_id}]
if not with_deleted:
f.append({ApiKeyPermission.deleted: False})
return await self.find_by(f)
async def find_by_permission_id(self, permission_id: int, with_deleted=False) -> list[ApiKeyPermission]:
f = [{ApiKeyPermission.permission_id: permission_id}]
if not with_deleted:
f.append({ApiKeyPermission.deleted: False})
return await self.find_by(f)

View File

@@ -0,0 +1,37 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class Permission(DbModelABC):
def __init__(
self,
id: SerialId,
name: str,
description: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._description = description
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, value: str):
self._description = value

View File

@@ -0,0 +1,21 @@
from typing import Optional
from cpl.auth.schema._permission.permission import Permission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class PermissionDao(DbModelDaoABC[Permission]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, Permission, TableManager.get("permissions"))
self.attribute(Permission.name, str)
self.attribute(Permission.description, Optional[str])
async def get_by_name(self, name: str) -> Permission:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Name = '{name}'")
return self.to_object(result[0])

View File

@@ -0,0 +1,66 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
class Role(DbModelABC):
def __init__(
self,
id: SerialId,
name: str,
description: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._description = description
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, value: str):
self._description = value
@async_property
async def permissions(self):
from cpl.auth.schema._permission.role_permission_dao import RolePermissionDao
role_permission_dao: RolePermissionDao = ServiceProviderABC.get_global_service(RolePermissionDao)
return [await x.permission for x in await role_permission_dao.get_by_role_id(self.id)]
@async_property
async def users(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = ServiceProviderABC.get_global_service(RoleUserDao)
return [await x.user for x in await role_user_dao.get_by_role_id(self.id)]
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._permission.permission_dao import PermissionDao
from cpl.auth.schema._permission.role_permission_dao import RolePermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
role_permission_dao: RolePermissionDao = ServiceProviderABC.get_global_service(RolePermissionDao)
p = await permission_dao.get_by_name(permission.value)
return p.id in [x.id for x in await role_permission_dao.get_by_role_id(self.id)]

View File

@@ -0,0 +1,17 @@
from cpl.auth.schema._permission.role import Role
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class RoleDao(DbModelDaoABC[Role]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, Role, TableManager.get("roles"))
self.attribute(Role.name, str)
self.attribute(Role.description, str)
async def get_by_name(self, name: str) -> Role:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Name = '{name}'")
return self.to_object(result[0])

View File

@@ -0,0 +1,46 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
class RolePermission(DbModelABC):
def __init__(
self,
id: SerialId,
role_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._role_id = role_id
self._permission_id = permission_id
@property
def role_id(self) -> int:
return self._role_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = ServiceProviderABC.get_global_service(RoleDao)
return await role_dao.get_by_id(self._role_id)
@property
def permission_id(self) -> int:
return self._permission_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
return await permission_dao.get_by_id(self._permission_id)

View File

@@ -0,0 +1,29 @@
from cpl.auth.schema._permission.role_permission import RolePermission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class RolePermissionDao(DbModelDaoABC[RolePermission]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, RolePermission, TableManager.get("role_permissions"))
self.attribute(RolePermission.role_id, int)
self.attribute(RolePermission.permission_id, int)
async def get_by_role_id(self, role_id: int, with_deleted=False) -> list[RolePermission]:
f = [{RolePermission.role_id: role_id}]
if not with_deleted:
f.append({RolePermission.deleted: False})
return await self.find_by(f)
async def get_by_permission_id(self, permission_id: int, with_deleted=False) -> list[RolePermission]:
f = [{RolePermission.permission_id: permission_id}]
if not with_deleted:
f.append({RolePermission.deleted: False})
return await self.find_by(f)

View File

@@ -0,0 +1,46 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import ServiceProviderABC
class RoleUser(DbJoinModelABC):
def __init__(
self,
id: SerialId,
user_id: SerialId,
role_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbJoinModelABC.__init__(self, id, user_id, role_id, deleted, editor_id, created, updated)
self._user_id = user_id
self._role_id = role_id
@property
def user_id(self) -> int:
return self._user_id
@async_property
async def user(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.get_by_id(self._user_id)
@property
def role_id(self) -> int:
return self._role_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = ServiceProviderABC.get_global_service(RoleDao)
return await role_dao.get_by_id(self._role_id)

View File

@@ -0,0 +1,29 @@
from cpl.auth.schema._permission.role_user import RoleUser
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.db_logger import DBLogger
_logger = DBLogger(__name__)
class RoleUserDao(DbModelDaoABC[RoleUser]):
def __init__(self):
DbModelDaoABC.__init__(self, __name__, RoleUser, TableManager.get("role_users"))
self.attribute(RoleUser.role_id, int)
self.attribute(RoleUser.user_id, int)
async def get_by_role_id(self, rid: int, with_deleted=False) -> list[RoleUser]:
f = [{RoleUser.role_id: rid}]
if not with_deleted:
f.append({RoleUser.deleted: False})
return await self.find_by(f)
async def get_by_user_id(self, uid: int, with_deleted=False) -> list[RoleUser]:
f = [{RoleUser.user_id: uid}]
if not with_deleted:
f.append({RoleUser.deleted: False})
return await self.find_by(f)

View File

@@ -0,0 +1,44 @@
CREATE TABLE IF NOT EXISTS administration_auth_users
(
id INT AUTO_INCREMENT PRIMARY KEY,
keycloakId CHAR(36) NOT NULL,
-- for history
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UC_KeycloakId UNIQUE (keycloakId),
CONSTRAINT FK_EditorId FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS administration_auth_users_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
keycloakId CHAR(36) NOT NULL,
-- for history
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_administration_auth_usersUpdate
AFTER UPDATE
ON administration_auth_users
FOR EACH ROW
BEGIN
INSERT INTO administration_auth_users_history
(id, keycloakId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.keycloakId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_administration_auth_usersDelete
AFTER DELETE
ON administration_auth_users
FOR EACH ROW
BEGIN
INSERT INTO administration_auth_users_history
(id, keycloakId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.keycloakId, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,46 @@
CREATE TABLE IF NOT EXISTS administration_api_keys
(
id INT AUTO_INCREMENT PRIMARY KEY,
identifier VARCHAR(255) NOT NULL,
keyString VARCHAR(255) NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UC_Identifier_Key UNIQUE (identifier, keyString),
CONSTRAINT UC_Key UNIQUE (keyString),
CONSTRAINT FK_ApiKeys_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS administration_api_keys_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
identifier VARCHAR(255) NOT NULL,
keyString VARCHAR(255) NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_ApiKeysUpdate
AFTER UPDATE
ON administration_api_keys
FOR EACH ROW
BEGIN
INSERT INTO administration_api_keys_history
(id, identifier, keyString, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.identifier, OLD.keyString, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_ApiKeysDelete
AFTER DELETE
ON administration_api_keys
FOR EACH ROW
BEGIN
INSERT INTO administration_api_keys_history
(id, identifier, keyString, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.identifier, OLD.keyString, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,179 @@
CREATE TABLE IF NOT EXISTS permission_permissions
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_PermissionName UNIQUE (name),
CONSTRAINT FK_Permissions_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_permissions_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_PermissionsUpdate
AFTER UPDATE
ON permission_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_permissions_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_PermissionsDelete
AFTER DELETE
ON permission_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_permissions_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, 1, OLD.editorId, OLD.created, NOW());
END;
CREATE TABLE IF NOT EXISTS permission_roles
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_RoleName UNIQUE (name),
CONSTRAINT FK_Roles_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_roles_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_RolesUpdate
AFTER UPDATE
ON permission_roles
FOR EACH ROW
BEGIN
INSERT INTO permission_roles_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_RolesDelete
AFTER DELETE
ON permission_roles
FOR EACH ROW
BEGIN
INSERT INTO permission_roles_history
(id, name, description, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.name, OLD.description, 1, OLD.editorId, OLD.created, NOW());
END;
CREATE TABLE IF NOT EXISTS permission_role_permissions
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_RolePermission UNIQUE (RoleId, permissionId),
CONSTRAINT FK_RolePermissions_Role FOREIGN KEY (RoleId) REFERENCES permission_roles (id) ON DELETE CASCADE,
CONSTRAINT FK_RolePermissions_Permission FOREIGN KEY (permissionId) REFERENCES permission_permissions (id) ON DELETE CASCADE,
CONSTRAINT FK_RolePermissions_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_role_permissions_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_RolePermissionsUpdate
AFTER UPDATE
ON permission_role_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_role_permissions_history
(id, RoleId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.permissionId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_RolePermissionsDelete
AFTER DELETE
ON permission_role_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_role_permissions_history
(id, RoleId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.permissionId, 1, OLD.editorId, OLD.created, NOW());
END;
CREATE TABLE IF NOT EXISTS permission_role_auth_users
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
UserId INT NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_RoleUser UNIQUE (RoleId, UserId),
CONSTRAINT FK_Roleauth_users_Role FOREIGN KEY (RoleId) REFERENCES permission_roles (id) ON DELETE CASCADE,
CONSTRAINT FK_Roleauth_users_User FOREIGN KEY (UserId) REFERENCES administration_auth_users (id) ON DELETE CASCADE,
CONSTRAINT FK_Roleauth_users_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_role_auth_users_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
RoleId INT NOT NULL,
UserId INT NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_Roleauth_usersUpdate
AFTER UPDATE
ON permission_role_auth_users
FOR EACH ROW
BEGIN
INSERT INTO permission_role_auth_users_history
(id, RoleId, UserId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.UserId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_Roleauth_usersDelete
AFTER DELETE
ON permission_role_auth_users
FOR EACH ROW
BEGIN
INSERT INTO permission_role_auth_users_history
(id, RoleId, UserId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.RoleId, OLD.UserId, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,46 @@
CREATE TABLE IF NOT EXISTS permission_api_key_permissions
(
id INT AUTO_INCREMENT PRIMARY KEY,
apiKeyId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CONSTRAINT UQ_ApiKeyPermission UNIQUE (apiKeyId, permissionId),
CONSTRAINT FK_ApiKeyPermissions_ApiKey FOREIGN KEY (apiKeyId) REFERENCES administration_api_keys (id) ON DELETE CASCADE,
CONSTRAINT FK_ApiKeyPermissions_Permission FOREIGN KEY (permissionId) REFERENCES permission_permissions (id) ON DELETE CASCADE,
CONSTRAINT FK_ApiKeyPermissions_Editor FOREIGN KEY (editorId) REFERENCES administration_auth_users (id)
);
CREATE TABLE IF NOT EXISTS permission_api_key_permissions_history
(
id INT AUTO_INCREMENT PRIMARY KEY,
apiKeyId INT NOT NULL,
permissionId INT NOT NULL,
deleted BOOL NOT NULL,
editorId INT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL
);
CREATE TRIGGER TR_ApiKeyPermissionsUpdate
AFTER UPDATE
ON permission_api_key_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_api_key_permissions_history
(id, apiKeyId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.apiKeyId, OLD.permissionId, OLD.deleted, OLD.editorId, OLD.created, NOW());
END;
CREATE TRIGGER TR_ApiKeyPermissionsDelete
AFTER DELETE
ON permission_api_key_permissions
FOR EACH ROW
BEGIN
INSERT INTO permission_api_key_permissions_history
(id, apiKeyId, permissionId, deleted, editorId, created, updated)
VALUES (OLD.id, OLD.apiKeyId, OLD.permissionId, 1, OLD.editorId, OLD.created, NOW());
END;

View File

@@ -0,0 +1,26 @@
CREATE SCHEMA IF NOT EXISTS administration;
CREATE TABLE IF NOT EXISTS administration.auth_users
(
id SERIAL PRIMARY KEY,
keycloakId UUID NOT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UC_KeycloakId UNIQUE (keycloakId)
);
CREATE TABLE IF NOT EXISTS administration.auth_users_history
(
LIKE administration.auth_users
);
CREATE TRIGGER users_history_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON administration.auth_users
FOR EACH ROW
EXECUTE FUNCTION public.history_trigger_function();

View File

@@ -0,0 +1,28 @@
CREATE SCHEMA IF NOT EXISTS administration;
CREATE TABLE IF NOT EXISTS administration.api_keys
(
id SERIAL PRIMARY KEY,
identifier VARCHAR(255) NOT NULL,
keyString VARCHAR(255) NOT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UC_Identifier_Key UNIQUE (identifier, keyString),
CONSTRAINT UC_Key UNIQUE (keyString)
);
CREATE TABLE IF NOT EXISTS administration.api_keys_history
(
LIKE administration.api_keys
);
CREATE TRIGGER api_keys_history_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON administration.api_keys
FOR EACH ROW
EXECUTE FUNCTION public.history_trigger_function();

View File

@@ -0,0 +1,105 @@
CREATE SCHEMA IF NOT EXISTS permission;
-- Permissions
CREATE TABLE permission.permissions
(
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_PermissionName UNIQUE (name)
);
CREATE TABLE permission.permissions_history
(
LIKE permission.permissions
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.permissions
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();
-- Roles
CREATE TABLE permission.roles
(
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT NULL,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_RoleName UNIQUE (name)
);
CREATE TABLE permission.roles_history
(
LIKE permission.roles
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.roles
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();
-- Role permissions
CREATE TABLE permission.role_permissions
(
id SERIAL PRIMARY KEY,
RoleId INT NOT NULL REFERENCES permission.roles (id) ON DELETE CASCADE,
permissionId INT NOT NULL REFERENCES permission.permissions (id) ON DELETE CASCADE,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_RolePermission UNIQUE (RoleId, permissionId)
);
CREATE TABLE permission.role_permissions_history
(
LIKE permission.role_permissions
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.role_permissions
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();
-- Role user
CREATE TABLE permission.role_users
(
id SERIAL PRIMARY KEY,
RoleId INT NOT NULL REFERENCES permission.roles (id) ON DELETE CASCADE,
UserId INT NOT NULL REFERENCES administration.auth_users (id) ON DELETE CASCADE,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_RoleUser UNIQUE (RoleId, UserId)
);
CREATE TABLE permission.role_users_history
(
LIKE permission.role_users
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.role_users
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();

View File

@@ -0,0 +1,24 @@
CREATE TABLE permission.api_key_permissions
(
id SERIAL PRIMARY KEY,
apiKeyId INT NOT NULL REFERENCES administration.api_keys (id) ON DELETE CASCADE,
permissionId INT NOT NULL REFERENCES permission.permissions (id) ON DELETE CASCADE,
-- for history
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL REFERENCES administration.auth_users (id),
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT UQ_ApiKeyPermission UNIQUE (apiKeyId, permissionId)
);
CREATE TABLE permission.api_key_permissions_history
(
LIKE permission.api_key_permissions
);
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON permission.api_key_permissions
FOR EACH ROW
EXECUTE PROCEDURE public.history_trigger_function();

View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-auth"
version = "2024.7.0"
description = "CPL auth"
readme ="CPL auth package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "auth", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,4 @@
cpl-core
cpl-dependency
cpl-database
python-keycloak-5.8.1

View File

@@ -1 +0,0 @@

View File

@@ -2,4 +2,6 @@ from abc import ABC
class ConfigurationModelABC(ABC):
pass
r"""
ABC for configuration model classes
"""

View File

@@ -0,0 +1 @@
from .user_context import set_user, get_user

View File

@@ -0,0 +1,18 @@
from contextvars import ContextVar
from typing import Optional
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.schema._administration.auth_user import AuthUser
_user_context: ContextVar[Optional[AuthUser]] = ContextVar("user", default=None)
_logger = AuthLogger(__name__)
def set_user(user_id: Optional[AuthUser]):
_logger.trace("Setting user context", user_id)
_user_context.set(user_id)
def get_user() -> Optional[AuthUser]:
return _user_context.get()

View File

@@ -3,7 +3,7 @@ from socket import gethostname
from typing import Optional, Type
from cpl.core.environment.environment_enum import EnvironmentEnum
from cpl.core.typing import T
from cpl.core.typing import T, D
from cpl.core.utils.get_value import get_value
@@ -55,14 +55,14 @@ class Environment:
os.environ[key] = str(value)
@staticmethod
def get(key: str, cast_type: Type[T], default: Optional[T] = None) -> Optional[T]:
def get(key: str, cast_type: Type[T], default: D = None) -> T | D:
"""
Get an environment variable and cast it to a specified type.
:param str key: The name of the environment variable.
:param Type[T] cast_type: A callable to cast the variable's value.
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
:param T default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
:return: The casted value, or None if the variable is not found.
:rtype: Optional[T]
:rtype: T | D
"""
return get_value(dict(os.environ), key, cast_type, default)

View File

@@ -1,4 +1,4 @@
from .logger import Logger
from .logger_abc import LoggerABC
from .log_level_enum import LogLevelEnum
from .log_level_enum import LogLevel
from .logging_settings import LogSettings

View File

@@ -1,7 +1,7 @@
from enum import Enum
class LogLevelEnum(Enum):
class LogLevel(Enum):
off = "OFF" # Nothing
trace = "TRC" # Detailed app information's
debug = "DEB" # Detailed app state

View File

@@ -3,28 +3,31 @@ import traceback
from datetime import datetime
from cpl.core.console import Console
from cpl.core.log.log_level_enum import LogLevelEnum
from cpl.core.log.log_level_enum import LogLevel
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.typing import Messages, Source
class Logger(LoggerABC):
_level = LogLevelEnum.info
_levels = [x for x in LogLevelEnum]
_level = LogLevel.info
_levels = [x for x in LogLevel]
# ANSI color codes for different log levels
_COLORS = {
LogLevelEnum.trace: "\033[37m", # Light Gray
LogLevelEnum.debug: "\033[94m", # Blue
LogLevelEnum.info: "\033[92m", # Green
LogLevelEnum.warning: "\033[93m", # Yellow
LogLevelEnum.error: "\033[91m", # Red
LogLevelEnum.fatal: "\033[95m", # Magenta
LogLevel.trace: "\033[37m", # Light Gray
LogLevel.debug: "\033[94m", # Blue
LogLevel.info: "\033[92m", # Green
LogLevel.warning: "\033[93m", # Yellow
LogLevel.error: "\033[91m", # Red
LogLevel.fatal: "\033[95m", # Magenta
}
def __init__(self, source: Source, file_prefix: str = None):
LoggerABC.__init__(self)
assert source is not None and source != "", "Source cannot be None or empty"
if source == LoggerABC.__name__:
source = None
self._source = source
if file_prefix is None:
@@ -45,7 +48,7 @@ class Logger(LoggerABC):
os.makedirs("logs")
@classmethod
def set_level(cls, level: LogLevelEnum):
def set_level(cls, level: LogLevel):
if level in cls._levels:
cls._level = level
else:
@@ -69,7 +72,7 @@ class Logger(LoggerABC):
log_file.write(content + "\n")
log_file.close()
def _log(self, level: LogLevelEnum, *messages: Messages):
def _log(self, level: LogLevel, *messages: Messages):
try:
if self._levels.index(level) < self._levels.index(self._level):
return
@@ -78,7 +81,7 @@ class Logger(LoggerABC):
formatted_message = self._format_message(level.value, timestamp, *messages)
self._write_log_to_file(formatted_message)
Console.write_line(f"{self._COLORS.get(self._level, '\033[0m')}{formatted_message}\033[0m")
Console.write_line(f"{self._COLORS.get(level, '\033[0m')}{formatted_message}\033[0m")
except Exception as e:
print(f"Error while logging: {e} -> {traceback.format_exc()}")
@@ -91,27 +94,35 @@ class Logger(LoggerABC):
messages = [str(message) for message in messages if message is not None]
return f"<{timestamp}> [{level.upper():^3}] [{self._file_prefix}] - [{self._source}]: {' '.join(messages)}"
message = f"<{timestamp}>"
message += f" [{level.upper():^3}]"
message += f" [{self._file_prefix}]"
if self._source is not None:
message += f" - [{self._source}]"
message += f": {' '.join(messages)}"
return message
def header(self, string: str):
self._log(LogLevelEnum.info, string)
self._log(LogLevel.info, string)
def trace(self, *messages: Messages):
self._log(LogLevelEnum.trace, *messages)
self._log(LogLevel.trace, *messages)
def debug(self, *messages: Messages):
self._log(LogLevelEnum.debug, *messages)
self._log(LogLevel.debug, *messages)
def info(self, *messages: Messages):
self._log(LogLevelEnum.info, *messages)
self._log(LogLevel.info, *messages)
def warning(self, *messages: Messages):
self._log(LogLevelEnum.warning, *messages)
self._log(LogLevel.warning, *messages)
def error(self, message, e: Exception = None):
self._log(LogLevelEnum.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
self._log(LogLevel.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
def fatal(self, message, e: Exception = None, prevent_quit: bool = False):
self._log(LogLevelEnum.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
self._log(LogLevel.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
if not prevent_quit:
exit(-1)

View File

@@ -1,5 +1,6 @@
from abc import abstractmethod, ABC
from cpl.core.log.log_level_enum import LogLevel
from cpl.core.typing import Messages
@@ -7,12 +8,10 @@ class LoggerABC(ABC):
r"""ABC for :class:`cpl.core.log.logger_service.Logger`"""
@abstractmethod
def set_level(self, level: str):
pass
def set_level(self, level: LogLevel): ...
@abstractmethod
def _format_message(self, level: str, timestamp, *messages: Messages) -> str:
pass
def _format_message(self, level: str, timestamp, *messages: Messages) -> str: ...
@abstractmethod
def header(self, string: str):

View File

@@ -1,7 +1,7 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.log.log_level_enum import LogLevelEnum
from cpl.core.log.log_level_enum import LogLevel
class LogSettings(ConfigurationModelABC):
@@ -11,14 +11,14 @@ class LogSettings(ConfigurationModelABC):
self,
path: str = None,
filename: str = None,
console_log_level: LogLevelEnum = None,
file_log_level: LogLevelEnum = None,
console_log_level: LogLevel = None,
file_log_level: LogLevel = None,
):
ConfigurationModelABC.__init__(self)
self._path: Optional[str] = path
self._filename: Optional[str] = filename
self._console: Optional[LogLevelEnum] = console_log_level
self._level: Optional[LogLevelEnum] = file_log_level
self._console: Optional[LogLevel] = console_log_level
self._level: Optional[LogLevel] = file_log_level
@property
def path(self) -> str:
@@ -37,17 +37,17 @@ class LogSettings(ConfigurationModelABC):
self._filename = filename
@property
def console(self) -> LogLevelEnum:
def console(self) -> LogLevel:
return self._console
@console.setter
def console(self, console: LogLevelEnum) -> None:
def console(self, console: LogLevel) -> None:
self._console = console
@property
def level(self) -> LogLevelEnum:
def level(self) -> LogLevel:
return self._level
@level.setter
def level(self, level: LogLevelEnum) -> None:
def level(self, level: LogLevel) -> None:
self._level = level

View File

@@ -7,10 +7,8 @@ from cpl.core.typing import T
class PipeABC(ABC, Generic[T]):
@staticmethod
@abstractmethod
def to_str(value: T, *args) -> str:
pass
def to_str(value: T, *args) -> str: ...
@staticmethod
@abstractmethod
def from_str(value: str, *args) -> T:
pass
def from_str(value: str, *args) -> T: ...

View File

@@ -1,3 +1,4 @@
from enum import Enum
from typing import Type, Optional
from cpl.core.typing import T
@@ -40,6 +41,19 @@ def get_value(
if cast_type == bool:
return value.lower() in ["true", "1"]
if issubclass(cast_type, Enum):
try:
return cast_type(value)
except ValueError:
pass
try:
return cast_type[value]
except KeyError:
pass
return default
if (cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__) == list:
if not (value.startswith("[") and value.endswith("]")) and list_delimiter not in value:
raise ValueError("List values must be enclosed in square brackets or use a delimiter.")

View File

@@ -1,18 +1,39 @@
from typing import Type
from cpl.application.abc import ApplicationABC as _ApplicationABC
from cpl.dependency import ServiceCollection as _ServiceCollection
from . import mysql as _mysql
from . import postgres as _postgres
from .internal_tables import InternalTables
from .table_manager import TableManager
def _add(collection: _ServiceCollection,db_context: Type, default_port: int, server_type: str):
def _with_migrations(self: _ApplicationABC, *paths: list[str]) -> _ApplicationABC:
from cpl.application.host import Host
from cpl.database.service.migration_service import MigrationService
migration_service = self._services.get_service(MigrationService)
migration_service.with_directory("./scripts")
Host.run(migration_service.migrate)
return self
def _with_seeders(self: _ApplicationABC) -> _ApplicationABC:
from cpl.database.service.seeder_service import SeederService
from cpl.application.host import Host
seeder_service: SeederService = self._services.get_service(SeederService)
Host.run(seeder_service.seed)
return self
def _add(collection: _ServiceCollection, db_context: Type, default_port: int, server_type: str):
from cpl.core.console import Console
from cpl.core.configuration import Configuration
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.model.server_type import ServerTypes, ServerType
from cpl.database.model.database_settings import DatabaseSettings
from cpl.database.service.migration_service import MigrationService
from cpl.database.service.seeder_service import SeederService
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
try:
@@ -22,20 +43,26 @@ def _add(collection: _ServiceCollection,db_context: Type, default_port: int, ser
collection.add_singleton(DBContextABC, db_context)
collection.add_singleton(ExecutedMigrationDao)
collection.add_singleton(MigrationService)
collection.add_singleton(SeederService)
except ImportError as e:
Console.error("cpl-database is not installed", str(e))
def add_mysql(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 3306, ServerTypes.MYSQL.value)
def add_postgres(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 5432, ServerTypes.POSTGRES.value)
_ServiceCollection.with_module(add_mysql, _mysql.__name__)
_ServiceCollection.with_module(add_postgres, _postgres.__name__)
_ApplicationABC.extend(_ApplicationABC.with_migrations, _with_migrations)
_ApplicationABC.extend(_ApplicationABC.with_seeders, _with_seeders)

View File

@@ -9,18 +9,15 @@ class ConnectionABC(ABC):
r"""ABC for the :class:`cpl.database.connection.database_connection.DatabaseConnection`"""
@abstractmethod
def __init__(self):
pass
def __init__(self): ...
@property
@abstractmethod
def server(self) -> MySQLConnectionAbstract:
pass
def server(self) -> MySQLConnectionAbstract: ...
@property
@abstractmethod
def cursor(self) -> MySQLCursorBuffered:
pass
def cursor(self) -> MySQLCursorBuffered: ...
@abstractmethod
def connect(self, database_settings: DatabaseSettings):

View File

@@ -4,13 +4,14 @@ from enum import Enum
from types import NoneType
from typing import Generic, Optional, Union, Type, List, Any
from cpl.core.ctx import get_user
from cpl.core.typing import T, Id
from cpl.core.utils import String
from cpl.core.utils.get_value import get_value
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.const import DATETIME_FORMAT
from cpl.database.db_logger import DBLogger
from cpl.database.external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.database.postgres.sql_select_builder import SQLSelectBuilder
from cpl.database.typing import T_DBM, Attribute, AttributeFilters, AttributeSorts
@@ -21,7 +22,7 @@ class DataAccessObjectABC(ABC, Generic[T_DBM]):
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
from cpl.dependency.service_provider_abc import ServiceProviderABC
self._db = ServiceProviderABC.get_global_provider().get_service(DBContextABC)
self._db = ServiceProviderABC.get_global_service(DBContextABC)
self._logger = DBLogger(source)
self._model_type = model_type
@@ -867,9 +868,9 @@ class DataAccessObjectABC(ABC, Generic[T_DBM]):
@staticmethod
async def _get_editor_id(obj: T_DBM):
editor_id = obj.editor_id
# if editor_id is None:
# user = get_user()
# if user is not None:
# editor_id = user.id
if editor_id is None:
user = get_user()
if user is not None:
editor_id = user.id
return editor_id if editor_id is not None else "NULL"

View File

@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod
class DataSeederABC(ABC):
@abstractmethod
async def seed(self): ...

View File

@@ -2,9 +2,9 @@ from abc import abstractmethod
from datetime import datetime
from typing import Type
from cpl.database import TableManager
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.abc.db_model_abc import DbModelABC
from cpl.database.internal_tables import InternalTables
class DbModelDaoABC[T_DBM](DataAccessObjectABC[T_DBM]):
@@ -15,10 +15,10 @@ class DbModelDaoABC[T_DBM](DataAccessObjectABC[T_DBM]):
self.attribute(DbModelABC.id, int, ignore=True)
self.attribute(DbModelABC.deleted, bool)
self.attribute(DbModelABC.editor_id, int, ignore=True) # handled by db trigger
self.attribute(DbModelABC.editor_id, int, db_name="editorId", ignore=True) # handled by db trigger
self.reference(
"editor", "id", DbModelABC.editor_id, InternalTables.users
"editor", "id", DbModelABC.editor_id, TableManager.get("auth_users")
) # not relevant for updates due to editor_id
self.attribute(DbModelABC.created, datetime, ignore=True) # handled by db trigger

View File

@@ -1,37 +0,0 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
class TableABC(ABC):
@abstractmethod
def __init__(self):
self._created_at: Optional[datetime] = datetime.now().isoformat()
self._modified_at: Optional[datetime] = datetime.now().isoformat()
@property
def created_at(self) -> datetime:
return self._created_at
@property
def modified_at(self) -> datetime:
return self._modified_at
@modified_at.setter
def modified_at(self, value: datetime):
self._modified_at = value
@property
@abstractmethod
def insert_string(self) -> str:
pass
@property
@abstractmethod
def udpate_string(self) -> str:
pass
@property
@abstractmethod
def delete_string(self) -> str:
pass

View File

@@ -1,15 +0,0 @@
from cpl.database.model.server_type import ServerTypes, ServerType
class InternalTables:
@classmethod
@property
def users(cls) -> str:
return "administration.users" if ServerType.server_type is ServerTypes.POSTGRES else "users"
@classmethod
@property
def executed_migrations(cls) -> str:
return "system._executed_migrations" if ServerType.server_type is ServerTypes.POSTGRES else "_executed_migrations"

View File

@@ -5,6 +5,7 @@ class ServerTypes(Enum):
POSTGRES = "postgres"
MYSQL = "mysql"
class ServerType:
_server_type: ServerTypes = None
@@ -18,4 +19,4 @@ class ServerType:
@property
def server_type(cls) -> ServerTypes:
assert cls._server_type is not None, "Server type is not set"
return cls._server_type
return cls._server_type

View File

@@ -31,7 +31,6 @@ class MySQLPool:
db=self._db_settings.database,
minsize=1,
maxsize=Environment.get("DB_POOL_SIZE", int, 1),
autocommit=True,
)
except Exception as e:
_logger.fatal("Failed to connect to the database", e)
@@ -62,6 +61,7 @@ class MySQLPool:
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
await con.commit()
if cursor.description is not None: # Query returns rows
res = await cursor.fetchall()

View File

@@ -1,6 +1,6 @@
from typing import Optional, Union
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.database.external_data_temp_table_builder import ExternalDataTempTableBuilder
class SQLSelectBuilder:

View File

@@ -1,4 +1,4 @@
from cpl.database import InternalTables
from cpl.database import TableManager
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.db_logger import DBLogger
from cpl.database.schema.executed_migration import ExecutedMigration
@@ -9,6 +9,6 @@ _logger = DBLogger(__name__)
class ExecutedMigrationDao(DataAccessObjectABC[ExecutedMigration]):
def __init__(self):
DataAccessObjectABC.__init__(self, __name__, ExecutedMigration, InternalTables.executed_migrations)
DataAccessObjectABC.__init__(self, __name__, ExecutedMigration, TableManager.get("executed_migrations"))
self.attribute(ExecutedMigration.migration_id, str, primary_key=True, db_name="migrationId")

View File

@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS _executed_migrations
CREATE TABLE IF NOT EXISTS system__executed_migrations
(
migrationId VARCHAR(255) PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

View File

@@ -1,26 +1,21 @@
DROP TRIGGER IF EXISTS `TR_TableUpdate`;
DELIMITER //
CREATE TRIGGER mytable_before_update
BEFORE UPDATE
ON mytable
CREATE TRIGGER `TR_TableUpdate`
AFTER UPDATE
ON `Table`
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
SET NEW.updated = NOW();
INSERT INTO `TableHistory` (Id, ..., Deleted, EditorId, Created, Updated)
VALUES (OLD.Id, ..., OLD.Deleted, OLD.Created, CURRENT_TIMESTAMP());
END;
//
DELIMITER ;
DELIMITER //
CREATE TRIGGER mytable_before_delete
BEFORE DELETE
ON mytable
DROP TRIGGER IF EXISTS `TR_TableDelete`;
CREATE TRIGGER `TR_TableDelete`
AFTER DELETE
ON `Table`
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
END;
//
DELIMITER ;
INSERT INTO `TableHistory` (Id, ..., Deleted, EditorId, Created, Updated)
VALUES (OLD.Id, ..., TRUE, OLD.Created, CURRENT_TIMESTAMP());
END;

View File

@@ -3,9 +3,9 @@ CREATE SCHEMA IF NOT EXISTS system;
CREATE TABLE IF NOT EXISTS system._executed_migrations
(
MigrationId VARCHAR(255) PRIMARY KEY,
Created timestamptz NOT NULL DEFAULT NOW(),
Updated timestamptz NOT NULL DEFAULT NOW()
migrationId VARCHAR(255) PRIMARY KEY,
created timestamptz NOT NULL DEFAULT NOW(),
updated timestamptz NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION public.history_trigger_function()
@@ -33,9 +33,9 @@ BEGIN
USING OLD;
END IF;
-- For UPDATE, update the Updated column and return the new row
-- For UPDATE, update the updated column and return the new row
IF (TG_OP = 'UPDATE') THEN
NEW.updated := NOW(); -- Update the Updated column
NEW.updated := NOW(); -- Update the updated column
RETURN NEW;
END IF;

View File

@@ -0,0 +1,18 @@
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.db_logger import DBLogger
from cpl.dependency import ServiceProviderABC
_logger = DBLogger(__name__)
class SeederService:
def __init__(self, provider: ServiceProviderABC):
self._provider = provider
async def seed(self):
seeders = self._provider.get_services(DataSeederABC)
_logger.debug(f"Found {len(seeders)} seeders")
for seeder in seeders:
await seeder.seed()

View File

@@ -0,0 +1,49 @@
from cpl.database.model.server_type import ServerTypes, ServerType
class TableManager:
_tables: dict[str, dict[ServerType, str]] = {
"executed_migrations": {
ServerTypes.POSTGRES: "system._executed_migrations",
ServerTypes.MYSQL: "system__executed_migrations",
},
"auth_users": {
ServerTypes.POSTGRES: "administration.auth_users",
ServerTypes.MYSQL: "administration_auth_users",
},
"api_keys": {
ServerTypes.POSTGRES: "administration.api_keys",
ServerTypes.MYSQL: "administration_api_keys",
},
"api_key_permissions": {
ServerTypes.POSTGRES: "permission.api_key_permissions",
ServerTypes.MYSQL: "permission_api_key_permissions",
},
"permissions": {
ServerTypes.POSTGRES: "permission.permissions",
ServerTypes.MYSQL: "permission_permissions",
},
"roles": {
ServerTypes.POSTGRES: "permission.roles",
ServerTypes.MYSQL: "permission_roles",
},
"role_permissions": {
ServerTypes.POSTGRES: "permission.role_permissions",
ServerTypes.MYSQL: "permission_role_permissions",
},
"role_users": {
ServerTypes.POSTGRES: "permission.role_users",
ServerTypes.MYSQL: "permission_role_users",
},
}
@classmethod
def get(cls, key: str) -> str:
if key not in cls._tables:
raise KeyError(f"Table '{key}' not found in TableManager.")
server_type = ServerType.server_type
if server_type not in cls._tables[key]:
raise KeyError(f"Server type '{server_type}' not configured for table '{key}'.")
return cls._tables[key][server_type]

View File

@@ -4,8 +4,7 @@ from abc import ABC, abstractmethod
class ScopeABC(ABC):
r"""ABC for the class :class:`cpl.dependency.scope.Scope`"""
def __init__(self):
pass
def __init__(self): ...
@property
@abstractmethod

View File

@@ -1,8 +1,7 @@
from typing import Union, Type, Callable, Optional
from typing import Union, Type, Callable
from cpl.core.log.logger import Logger
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T, Service
from cpl.dependency.service_descriptor import ServiceDescriptor
from cpl.dependency.service_lifetime_enum import ServiceLifetimeEnum
@@ -46,29 +45,6 @@ class ServiceCollection:
return self
def add_module(self, module: str | object):
if not isinstance(module, str):
module = module.__name__
if module not in self._modules:
raise ValueError(f"Module {module} not found")
self._modules[module](self)
# def add_mysql(self, db_context_type: Type[DatabaseContextABC], db_settings: DatabaseSettings):
# self.add_singleton(DatabaseContextABC, db_context_type)
# self._database_context = self.build_service_provider().get_service(DatabaseContextABC)
# self._database_context.connect(db_settings)
def add_logging(self):
self.add_transient(LoggerABC, Logger)
return self
def add_pipes(self):
for pipe in PipeABC.__subclasses__():
self.add_transient(PipeABC, pipe)
return self
def add_singleton(self, service_type: T, service: Service = None):
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.singleton, service)
return self
@@ -81,7 +57,20 @@ class ServiceCollection:
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.transient, service)
return self
def build_service_provider(self) -> ServiceProviderABC:
def build(self) -> ServiceProviderABC:
sp = ServiceProvider(self._service_descriptors)
ServiceProviderABC.set_global_provider(sp)
return sp
def add_module(self, module: str | object):
if not isinstance(module, str):
module = module.__name__
if module not in self._modules:
raise ValueError(f"Module {module} not found")
self._modules[module](self)
def add_logging(self):
self.add_transient(LoggerABC, Logger)
return self

View File

@@ -59,7 +59,7 @@ class ServiceProvider(ServiceProviderABC):
# raise Exception(f'Service {parameter.annotation} not found')
def _get_services(self, t: type, *args, service_type: type = None, **kwargs) -> list[Optional[object]]:
def _get_services(self, t: type, service_type: type = None, **kwargs) -> list[Optional[object]]:
implementations = []
for descriptor in self._service_descriptors:
if descriptor.service_type == t or issubclass(descriptor.service_type, t):
@@ -67,7 +67,9 @@ class ServiceProvider(ServiceProviderABC):
implementations.append(descriptor.implementation)
continue
implementation = self._build_service(descriptor.service_type, *args, service_type, **kwargs)
implementation = self._build_service(
descriptor.service_type, origin_service_type=service_type, **kwargs
)
if descriptor.lifetime == ServiceLifetimeEnum.singleton:
descriptor.implementation = implementation
@@ -93,7 +95,8 @@ class ServiceProvider(ServiceProviderABC):
params.append(Environment)
elif issubclass(parameter.annotation, ConfigurationModelABC):
params.append(Configuration.get(parameter.annotation))
conf = Configuration.get(parameter.annotation)
params.append(parameter.annotation() if conf is None else conf)
elif issubclass(parameter.annotation, Configuration):
params.append(Configuration)
@@ -133,7 +136,7 @@ class ServiceProvider(ServiceProviderABC):
else:
descriptors.append(copy.deepcopy(descriptor))
sb = ScopeBuilder(ServiceProvider(descriptors, self._database_context))
sb = ScopeBuilder(ServiceProvider(descriptors))
return sb.build()
def get_service(self, service_type: T, *args, **kwargs) -> Optional[R]:

View File

@@ -1,10 +1,10 @@
import functools
from abc import abstractmethod, ABC
from inspect import Signature, signature
from typing import Optional
from typing import Optional, Type
from cpl.dependency.scope_abc import ScopeABC
from cpl.core.typing import T, R
from cpl.dependency.scope_abc import ScopeABC
class ServiceProviderABC(ABC):
@@ -13,8 +13,7 @@ class ServiceProviderABC(ABC):
_provider: Optional["ServiceProviderABC"] = None
@abstractmethod
def __init__(self):
pass
def __init__(self): ...
@classmethod
def set_global_provider(cls, provider: "ServiceProviderABC"):
@@ -24,9 +23,20 @@ class ServiceProviderABC(ABC):
def get_global_provider(cls) -> Optional["ServiceProviderABC"]:
return cls._provider
@classmethod
def get_global_service(cls, instance_type: T, *args, **kwargs) -> Optional[R]:
if cls._provider is None:
return None
return cls._provider.get_service(instance_type, *args, **kwargs)
@classmethod
def get_global_services(cls, instance_type: T, *args, **kwargs) -> list[Optional[R]]:
if cls._provider is None:
return []
return cls._provider.get_services(instance_type, *args, **kwargs)
@abstractmethod
def _build_by_signature(self, sig: Signature, origin_service_type: type) -> list[R]:
pass
def _build_by_signature(self, sig: Signature, origin_service_type: type) -> list[R]: ...
@abstractmethod
def _build_service(self, service_type: type, *args, **kwargs) -> object:
@@ -62,7 +72,7 @@ class ServiceProviderABC(ABC):
"""
@abstractmethod
def get_service(self, instance_type: T, *args, **kwargs) -> Optional[R]:
def get_service(self, instance_type: Type[T], *args, **kwargs) -> Optional[T]:
r"""Returns instance of given type
Parameter
@@ -76,7 +86,7 @@ class ServiceProviderABC(ABC):
"""
@abstractmethod
def get_services(self, service_type: T, *args, **kwargs) -> list[Optional[R]]:
def get_services(self, service_type: Type[T], *args, **kwargs) -> list[Optional[T]]:
r"""Returns instance of given type
Parameter

View File

@@ -1 +1,2 @@
cpl-core
cpl-core
cpl-dependency

View File

@@ -5,25 +5,19 @@ from cpl.translation.translation_settings import TranslationSettings
class TranslationServiceABC(ABC):
@abstractmethod
def __init__(self):
pass
def __init__(self): ...
@abstractmethod
def set_default_lang(self, lang: str):
pass
def set_default_lang(self, lang: str): ...
@abstractmethod
def set_lang(self, lang: str):
pass
def set_lang(self, lang: str): ...
@abstractmethod
def load(self, lang: str):
pass
def load(self, lang: str): ...
@abstractmethod
def load_by_settings(self, settings: TranslationSettings):
pass
def load_by_settings(self, settings: TranslationSettings): ...
@abstractmethod
def translate(self, key: str) -> str:
pass
def translate(self, key: str) -> str: ...

View File

@@ -1 +1,2 @@
cpl-core
cpl-core
cpl-dependency

View File

@@ -1,15 +0,0 @@
{
"TimeFormatSettings": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "ERROR",
"FileLogLevel": "WARN"
}
}

View File

@@ -1,9 +0,0 @@
{
"WorkspaceSettings": {
"DefaultProject": "async",
"Projects": {
"async": "src/async/async.json"
},
"Scripts": {}
}
}

View File

@@ -1,15 +0,0 @@
from cpl.application import ApplicationABC
from cpl.core.configuration import ConfigurationABC
from cpl.core.console import Console
from cpl.dependency import ServiceProviderABC
class Application(ApplicationABC):
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
ApplicationABC.__init__(self, config, services)
async def configure(self):
pass
async def main(self):
Console.write_line("Hello World")

View File

@@ -1,41 +0,0 @@
{
"ProjectSettings": {
"Name": "async",
"Version": {
"Major": "0",
"Minor": "0",
"Micro": "0"
},
"Author": "",
"AuthorEmail": "",
"Description": "",
"LongDescription": "",
"URL": "",
"CopyrightDate": "",
"CopyrightName": "",
"LicenseName": "",
"LicenseDescription": "",
"Dependencies": [
"sh_cpl>=2021.10.0.post1"
],
"PythonVersion": ">=3.9.2",
"PythonPath": {},
"Classifiers": []
},
"BuildSettings": {
"ProjectType": "console",
"SourcePath": "",
"OutputPath": "../../dist",
"Main": "async.main",
"EntryPoint": "async",
"IncludePackageData": false,
"Included": [],
"Excluded": [
"*/__pycache__",
"*/logs",
"*/tests"
],
"PackageData": {},
"ProjectReferences": []
}
}

View File

@@ -1,17 +0,0 @@
import asyncio
from cpl.application import ApplicationBuilder
from application import Application
from startup import Startup
async def main():
app_builder = ApplicationBuilder(Application)
app_builder.use_startup(Startup)
app = await app_builder.build_async()
await app.run_async()
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())

View File

@@ -1,17 +0,0 @@
from cpl.application.async_startup_abc import AsyncStartupABC
from cpl.core.configuration import ConfigurationABC
from cpl.dependency import ServiceProviderABC, ServiceCollection
from cpl.core.environment import Environment
class Startup(AsyncStartupABC):
def __init__(self):
AsyncStartupABC.__init__(self)
async def configure_configuration(
self, configuration: ConfigurationABC, environment: Environment
) -> ConfigurationABC:
return configuration
async def configure_services(self, services: ServiceCollection, environment: Environment) -> ServiceProviderABC:
return services.build_service_provider()

View File

@@ -15,32 +15,27 @@ def test_console():
Console.write_line("Hello World")
Console.write("\nName: ")
Console.write_line(" Hello", Console.read_line())
Console.clear()
Console.write_at(5, 5, "at 5, 5")
Console.write_at(10, 10, "at 10, 10")
if __name__ == "__main__":
Console.write_line("Hello World\n")
Console.clear()
Console.spinner(
"Test:", test_spinner, spinner_foreground_color=ForegroundColorEnum.cyan, text_foreground_color="green"
)
test_console()
Console.write_line("HOLD BACK")
# opts = [
# 'Option 1',
# 'Option 2',
# 'Option 3',
# 'Option 4'
# ]
# selected = Console.select(
# '>',
# 'Select item:',
# opts,
# header_foreground_color=ForegroundColorEnum.blue,
# option_foreground_color=ForegroundColorEnum.green,
# cursor_foreground_color=ForegroundColorEnum.red
# )
# Console.write_line(f'You selected: {selected}')
# # test_console()
#
# Console.write_line()
opts = ["Option 1", "Option 2", "Option 3", "Option 4"]
selected = Console.select(
">",
"Select item:",
opts,
header_foreground_color=ForegroundColorEnum.blue,
option_foreground_color=ForegroundColorEnum.green,
cursor_foreground_color=ForegroundColorEnum.red,
)
Console.write_line(f"You selected: {selected}")
Console.write_line()

View File

@@ -1,43 +1,39 @@
from typing import Optional
from cpl.application import ApplicationABC
from cpl.application.abc.application_abc import ApplicationABC
from cpl.auth.keycloak import KeycloakAdmin
from cpl.core.console import Console
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.dependency import ServiceProviderABC
from model.city import City
from model.city_dao import CityDao
from model.user import User
from model.user_dao import UserDao
from model.user_repo import UserRepo
from model.user_repo_abc import UserRepoABC
class Application(ApplicationABC):
def __init__(self, services: ServiceProviderABC):
ApplicationABC.__init__(self, services)
self._logger: Optional[LoggerABC] = None
async def test_repos(self):
user_repo: UserRepo = self._services.get_service(UserRepoABC)
if len(await user_repo.get_users()) == 0:
user_repo.add_test_user()
Console.write_line("Users:")
for user in await user_repo.get_users():
Console.write_line(user.UserId, user.Name, user.City)
Console.write_line("Cities:")
for city in await user_repo.get_cities():
Console.write_line(city.CityId, city.Name, city.ZIP)
self._logger = services.get_service(LoggerABC)
async def test_daos(self):
userDao: UserDao = self._services.get_service(UserDao)
cityDao: CityDao = self._services.get_service(CityDao)
Console.write_line(await userDao.get_all())
async def configure(self):
self._logger = self._services.get_service(LoggerABC)
if len(await cityDao.get_all()) == 0:
city_id = await cityDao.create(City(0, "Haren", "49733"))
await userDao.create(User(0, "NewUser", city_id))
Console.write_line(await userDao.get_all())
async def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")
await self.test_daos()
kc_admin: KeycloakAdmin = self._services.get_service(KeycloakAdmin)
x = kc_admin.get_users()
Console.write_line(x)

Some files were not shown because too many files have changed in this diff Show More