Compare commits

...

6 Commits

Author SHA1 Message Date
4625b626e6 Added dao base
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 16s
Build on push / core (push) Successful in 23s
Build on push / translation (push) Successful in 14s
Build on push / mail (push) Successful in 14s
2025-09-17 00:12:25 +02:00
58dbd3ed1e Cleanup for mysql
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 18s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 17s
2025-09-16 20:21:33 +02:00
cd7dfaf2b4 Fixed spinner thread & remove log thread
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 19s
Build on push / translation (push) Successful in 18s
Build on push / mail (push) Successful in 18s
2025-09-16 18:41:57 +02:00
8ad3e3bdb4 Removed ConfigModel from_dict
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 16s
Build on push / query (push) Successful in 17s
Build on push / mail (push) Successful in 17s
Build on push / translation (push) Successful in 18s
2025-09-16 08:51:56 +02:00
b97bc0a3ed Restructuring
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 17s
Build on push / core (push) Successful in 26s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 17s
2025-09-16 08:48:07 +02:00
5f25400bcd Updated config & environment 2025-09-16 08:39:00 +02:00
228 changed files with 3011 additions and 3112 deletions

153
README.md
View File

@@ -1,153 +0,0 @@
<h1 align="center">CPL - Common python library</h1>
<!-- Summary -->
<p align="center">
<!-- <img src="" alt="cpl-logo" width="120px" height="120px"/> -->
<br>
<i>
CPL is a development platform for python server applications
<br>using Python.</i>
<br>
</p>
## Table of Contents
<!-- TABLE OF CONTENTS -->
<ol>
<li><a href="#Features">Features</a></li>
<li>
<a href="#getting-started">Getting Started</a>
<ul>
<li><a href="#prerequisites">Prerequisites</a></li>
<li><a href="#installation">Installation</a></li>
</ul>
</li>
<li><a href="#roadmap">Roadmap</a></li>
<li><a href="#contributing">Contributing</a></li>
<li><a href="#license">License</a></li>
<li><a href="#contact">Contact</a></li>
</ol>
## Features
<!-- FEATURE OVERVIEW -->
- Expandle
- Application base
- Standardized application classes
- Application object builder
- Application extension classes
- Startup classes
- Startup extension classes
- Configuration
- Configure via object mapped JSON
- Console argument handling
- Console class for in and output
- Banner
- Spinner
- Options (menu)
- Table
- Write
- Write_at
- Write_line
- Write_line_at
- Dependency injection
- Service lifetimes: singleton, scoped and transient
- Providing of application environment
- Environment (development, staging, testing, production)
- Appname
- Customer
- Hostname
- Runtime directory
- Working directory
- Logging
- Standardized logger
- Log-level (FATAL, ERROR, WARN, INFO, DEBUG & TRACE)
- Mail handling
- Send mails
- Pipe classes
- Convert input
- Utils
- Credential manager
- Encryption via BASE64
- PIP wrapper class based on subprocess
- Run pip commands
- String converter to different variants
- to_lower_case
- to_camel_case
- ...
<!-- GETTING STARTED -->
## Getting Started
[Get started with CPL][quickstart].
### Prerequisites
- Install [python] which includes [Pip installs packages][pip]
### Installation
Install the CPL package
```sh
pip install cpl-core --extra-index-url https://pip.sh-edraft.de
```
Install the CPL CLI
```sh
pip install cpl-cli --extra-index-url https://pip.sh-edraft.de
```
Create workspace:
```sh
cpl new <console|library|unittest> <PROJECT NAME>
```
Run the application:
```sh
cd <PROJECT NAME>
cpl start
```
<!-- ROADMAP -->
## Roadmap
See the [open issues](https://git.sh-edraft.de/sh-edraft.de/sh_cpl/issues) for a list of proposed features (and known issues).
<!-- CONTRIBUTING -->
## Contributing
### Contributing Guidelines
Read through our [contributing guidelines][contributing] to learn about our submission process, coding rules and more.
### Want to Help?
Want to file a bug, contribute some code, or improve documentation? Excellent! Read up on our guidelines for [contributing][contributing].
<!-- LICENSE -->
## License
Distributed under the MIT License. See [LICENSE] for more information.
<!-- CONTACT -->
## Contact
Sven Heidemann - sven.heidemann@sh-edraft.de
Project link: [https://git.sh-edraft.de/sh-edraft.de/sh_common_py_lib](https://git.sh-edraft.de/sh-edraft.de/sh_cpl)
<!-- External LINKS -->
[pip_url]: https://pip.sh-edraft.de
[python]: https://www.python.org/
[pip]: https://pypi.org/project/pip/
<!-- Internal LINKS -->
[project]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl
[quickstart]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/quickstart
[contributing]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/contributing
[license]: LICENSE

View File

@@ -1,10 +1,9 @@
from abc import ABC, abstractmethod
from typing import Optional
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
from cpl.core.console.console import Console
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class ApplicationABC(ABC):
@@ -13,14 +12,12 @@ class ApplicationABC(ABC):
Parameters:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
Contains object loaded from appsettings
services: :class:`cpl.core.dependency_injection.service_provider_abc.ServiceProviderABC`
services: :class:`cpl.dependency.service_provider_abc.ServiceProviderABC`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
self._configuration: Optional[ConfigurationABC] = config
self._environment: Optional[ApplicationEnvironmentABC] = self._configuration.environment
def __init__(self, services: ServiceProviderABC):
self._services: Optional[ServiceProviderABC] = services
def run(self):
@@ -49,12 +46,12 @@ class ApplicationABC(ABC):
def configure(self):
r"""Configure the application
Called by :class:`cpl.core.application.application_abc.ApplicationABC.run`
Called by :class:`cpl.application.application_abc.ApplicationABC.run`
"""
@abstractmethod
def main(self):
r"""Custom entry point
Called by :class:`cpl.core.application.application_abc.ApplicationABC.run`
Called by :class:`cpl.application.application_abc.ApplicationABC.run`
"""

View File

@@ -0,0 +1,97 @@
from typing import Type, Optional, Callable, Union
from cpl.application.application_abc import ApplicationABC
from cpl.application.application_builder_abc import ApplicationBuilderABC
from cpl.application.application_extension_abc import ApplicationExtensionABC
from cpl.application.async_application_extension_abc import AsyncApplicationExtensionABC
from cpl.application.async_startup_abc import AsyncStartupABC
from cpl.application.async_startup_extension_abc import AsyncStartupExtensionABC
from cpl.application.startup_abc import StartupABC
from cpl.application.startup_extension_abc import StartupExtensionABC
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class ApplicationBuilder(ApplicationBuilderABC):
r"""This is class is used to build an object of :class:`cpl.application.application_abc.ApplicationABC`
Parameter:
app: Type[:class:`cpl.application.application_abc.ApplicationABC`]
Application to build
"""
def __init__(self, app: Type[ApplicationABC]):
ApplicationBuilderABC.__init__(self)
self._app = app
self._startup: Optional[StartupABC | AsyncStartupABC] = None
self._services = ServiceCollection()
self._app_extensions: list[Type[ApplicationExtensionABC | AsyncApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC | AsyncStartupABC]] = []
def use_startup(self, startup: Type[StartupABC | AsyncStartupABC]) -> "ApplicationBuilder":
self._startup = startup()
return self
def use_extension(
self,
extension: Type[
ApplicationExtensionABC | AsyncApplicationExtensionABC | StartupExtensionABC | AsyncStartupExtensionABC
],
) -> "ApplicationBuilder":
if (
issubclass(extension, ApplicationExtensionABC) or issubclass(extension, AsyncApplicationExtensionABC)
) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (
issubclass(extension, StartupExtensionABC) or issubclass(extension, AsyncStartupExtensionABC)
) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def _build_startup(self):
for ex in self._startup_extensions:
extension = ex()
extension.configure_configuration(Configuration, Environment)
extension.configure_services(self._services, Environment)
if self._startup is not None:
self._startup.configure_configuration(Configuration, Environment)
self._startup.configure_services(self._services, Environment)
async def _build_async_startup(self):
for ex in self._startup_extensions:
extension = ex()
await extension.configure_configuration(Configuration, Environment)
await extension.configure_services(self._services, Environment)
if self._startup is not None:
await self._startup.configure_configuration(Configuration, Environment)
await self._startup.configure_services(self._services, Environment)
def build(self) -> ApplicationABC:
self._build_startup()
config = Configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
extension.run(config, services)
return self._app(services)
async def build_async(self) -> ApplicationABC:
await self._build_async_startup()
config = Configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
await extension.run(config, services)
return self._app(services)

View File

@@ -1,12 +1,12 @@
from abc import ABC, abstractmethod
from typing import Type
from cpl.core.application.application_abc import ApplicationABC
from cpl.core.application.startup_abc import StartupABC
from cpl.application.application_abc import ApplicationABC
from cpl.application.startup_abc import StartupABC
class ApplicationBuilderABC(ABC):
r"""ABC for the :class:`cpl.core.application.application_builder.ApplicationBuilder`"""
r"""ABC for the :class:`cpl.application.application_builder.ApplicationBuilder`"""
@abstractmethod
def __init__(self, *args):
@@ -17,7 +17,7 @@ class ApplicationBuilderABC(ABC):
r"""Sets the custom startup class to use
Parameter:
startup: Type[:class:`cpl.core.application.startup_abc.StartupABC`]
startup: Type[:class:`cpl.application.startup_abc.StartupABC`]
Startup class to use
"""
@@ -26,7 +26,7 @@ class ApplicationBuilderABC(ABC):
r"""Sets the custom startup class to use async
Parameter:
startup: Type[:class:`cpl.core.application.startup_abc.StartupABC`]
startup: Type[:class:`cpl.application.startup_abc.StartupABC`]
Startup class to use
"""
@@ -35,7 +35,7 @@ class ApplicationBuilderABC(ABC):
r"""Creates custom application object
Returns:
Object of :class:`cpl.core.application.application_abc.ApplicationABC`
Object of :class:`cpl.application.application_abc.ApplicationABC`
"""
@abstractmethod
@@ -43,5 +43,5 @@ class ApplicationBuilderABC(ABC):
r"""Creates custom application object async
Returns:
Object of :class:`cpl.core.application.application_abc.ApplicationABC`
Object of :class:`cpl.application.application_abc.ApplicationABC`
"""

View File

@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class AsyncApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
from cpl.dependency.service_collection import ServiceCollection
class AsyncStartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self):
r"""Creates configuration of application"""
@abstractmethod
async def configure_services(self, service: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -1,8 +1,8 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class AsyncStartupExtensionABC(ABC):
@@ -13,19 +13,19 @@ class AsyncStartupExtensionABC(ABC):
pass
@abstractmethod
async def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
async def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
config: :class:`cpl.core.configuration.configuration_abc.Configuration`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
async def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
async def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class StartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,8 +1,10 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class StartupExtensionABC(ABC):
@@ -13,7 +15,7 @@ class StartupExtensionABC(ABC):
pass
@abstractmethod
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
@@ -22,10 +24,10 @@ class StartupExtensionABC(ABC):
"""
@abstractmethod
def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-application"
version = "2024.7.0"
description = "CPL application"
readme ="CPL application package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "application", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,2 @@
cpl-core
cpl-dependency

View File

@@ -1,91 +0,0 @@
from typing import Type, Optional, Callable, Union
from cpl.core.application.application_abc import ApplicationABC
from cpl.core.application.application_builder_abc import ApplicationBuilderABC
from cpl.core.application.application_extension_abc import ApplicationExtensionABC
from cpl.core.application.async_application_extension_abc import AsyncApplicationExtensionABC
from cpl.core.application.async_startup_abc import AsyncStartupABC
from cpl.core.application.async_startup_extension_abc import AsyncStartupExtensionABC
from cpl.core.application.startup_abc import StartupABC
from cpl.core.application.startup_extension_abc import StartupExtensionABC
from cpl.core.configuration.configuration import Configuration
from cpl.core.dependency_injection.service_collection import ServiceCollection
class ApplicationBuilder(ApplicationBuilderABC):
r"""This is class is used to build an object of :class:`cpl.core.application.application_abc.ApplicationABC`
Parameter:
app: Type[:class:`cpl.core.application.application_abc.ApplicationABC`]
Application to build
"""
def __init__(self, app: Type[ApplicationABC]):
ApplicationBuilderABC.__init__(self)
self._app = app
self._startup: Optional[StartupABC | AsyncStartupABC] = None
self._configuration = Configuration()
self._environment = self._configuration.environment
self._services = ServiceCollection(self._configuration)
self._app_extensions: list[Type[ApplicationExtensionABC | AsyncApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC | AsyncStartupABC]] = []
def use_startup(self, startup: Type[StartupABC | AsyncStartupABC]) -> "ApplicationBuilder":
self._startup = startup()
return self
def use_extension(
self, extension: Type[ApplicationExtensionABC | AsyncApplicationExtensionABC | StartupExtensionABC | AsyncStartupExtensionABC]
) -> "ApplicationBuilder":
if (issubclass(extension, ApplicationExtensionABC) or issubclass(extension, AsyncApplicationExtensionABC)) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (issubclass(extension, StartupExtensionABC) or issubclass(extension, AsyncStartupExtensionABC)) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def _build_startup(self):
for ex in self._startup_extensions:
extension = ex()
extension.configure_configuration(self._configuration, self._environment)
extension.configure_services(self._services, self._environment)
if self._startup is not None:
self._startup.configure_configuration(self._configuration, self._environment)
self._startup.configure_services(self._services, self._environment)
async def _build_async_startup(self):
for ex in self._startup_extensions:
extension = ex()
await extension.configure_configuration(self._configuration, self._environment)
await extension.configure_services(self._services, self._environment)
if self._startup is not None:
await self._startup.configure_configuration(self._configuration, self._environment)
await self._startup.configure_services(self._services, self._environment)
def build(self) -> ApplicationABC:
self._build_startup()
config = self._configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
extension.run(config, services)
return self._app(config, services)
async def build_async(self) -> ApplicationABC:
await self._build_async_startup()
config = self._configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
await extension.run(config, services)
return self._app(config, services)

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import ConfigurationABC
from cpl.core.dependency_injection import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, config: ConfigurationABC, services: ServiceProviderABC):
pass

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import ConfigurationABC
from cpl.core.dependency_injection import ServiceProviderABC
class AsyncApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def run(self, config: ConfigurationABC, services: ServiceProviderABC):
pass

View File

@@ -1,32 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class AsyncStartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
async def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,32 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
class StartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollectionABC, env: ApplicationEnvironmentABC):
r"""Creates service provider
Parameter:
services: :class:`cpl.core.dependency_injection.service_collection_abc`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,12 +1,2 @@
from .argument_abc import ArgumentABC
from .argument_builder import ArgumentBuilder
from .argument_executable_abc import ArgumentExecutableABC
from .argument_type_enum import ArgumentTypeEnum
from .configuration import Configuration
from .configuration_abc import ConfigurationABC
from .configuration_model_abc import ConfigurationModelABC
from .configuration_variable_name_enum import ConfigurationVariableNameEnum
from .executable_argument import ExecutableArgument
from .flag_argument import FlagArgument
from .validator_abc import ValidatorABC
from .variable_argument import VariableArgument

View File

@@ -1,64 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
class ArgumentABC(ABC):
@abstractmethod
def __init__(
self,
token: str,
name: str,
aliases: list[str],
prevent_next_executable: bool = False,
console_arguments: list["ArgumentABC"] = None,
):
r"""Representation of an console argument
Parameter:
token: :class:`str`
name: :class:`str`
aliases: list[:class:`str`]
console_arguments: List[:class:`cpl.core.configuration.console_argument.ConsoleArgument`]
"""
self._token = token
self._name = name
self._aliases = aliases
self._prevent_next_executable = prevent_next_executable
self._console_arguments = console_arguments if console_arguments is not None else []
@property
def token(self) -> str:
return self._token
@property
def name(self) -> str:
return self._name
@property
def aliases(self) -> list[str]:
return self._aliases
@property
def prevent_next_executable(self) -> bool:
return self._prevent_next_executable
@property
def console_arguments(self) -> list["ArgumentABC"]:
return self._console_arguments
def add_console_argument(self, arg_type: ArgumentTypeEnum, *args, **kwargs) -> "ArgumentABC":
r"""Creates and adds a console argument to known console arguments
Parameter:
arg_type: :class:`str`
Specifies the specific type of the argument
Returns:
self :class:`cpl.core.configuration.console_argument.ConsoleArgument` not created argument!
"""
from cpl.core.configuration.argument_builder import ArgumentBuilder
argument = ArgumentBuilder.build_argument(arg_type, *args, *kwargs)
self._console_arguments.append(argument)
return self

View File

@@ -1,30 +0,0 @@
from typing import Union
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
from cpl.core.configuration.executable_argument import ExecutableArgument
from cpl.core.configuration.flag_argument import FlagArgument
from cpl.core.configuration.variable_argument import VariableArgument
from cpl.core.console import Console
class ArgumentBuilder:
@staticmethod
def build_argument(
arg_type: ArgumentTypeEnum, *args, **kwargs
) -> Union[ExecutableArgument, FlagArgument, VariableArgument]:
argument = None
try:
match arg_type:
case ArgumentTypeEnum.Flag:
argument = FlagArgument(*args, **kwargs)
case ArgumentTypeEnum.Executable:
argument = ExecutableArgument(*args, **kwargs)
case ArgumentTypeEnum.Variable:
argument = VariableArgument(*args, **kwargs)
case _:
Console.error("Invalid argument type")
Console.close()
except TypeError as e:
Console.error(str(e))
Console.close()
return argument

View File

@@ -1,11 +0,0 @@
from abc import ABC, abstractmethod
class ArgumentExecutableABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, args: list[str]):
pass

View File

@@ -1,7 +0,0 @@
from enum import Enum
class ArgumentTypeEnum(Enum):
Flag = 0
Executable = 1
Variable = 3

View File

@@ -1,65 +1,19 @@
import inspect
import json
import os
import sys
import traceback
from collections.abc import Callable
from typing import Union, Optional
from typing import Any
from cpl.core.configuration.argument_abc import ArgumentABC
from cpl.core.configuration.argument_builder import ArgumentBuilder
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
from cpl.core.configuration.configuration_abc import ConfigurationABC
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.configuration.configuration_variable_name_enum import (
ConfigurationVariableNameEnum,
)
from cpl.core.configuration.executable_argument import ExecutableArgument
from cpl.core.configuration.flag_argument import FlagArgument
from cpl.core.configuration.variable_argument import VariableArgument
from cpl.core.console.console import Console
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.environment.application_environment import ApplicationEnvironment
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.environment.environment_name_enum import EnvironmentNameEnum
from cpl.core.typing import T, R
from cpl.core.environment.environment import Environment
from cpl.core.typing import D, T
from cpl.core.utils.json_processor import JSONProcessor
class Configuration(ConfigurationABC):
def __init__(self):
r"""Representation of configuration"""
ConfigurationABC.__init__(self)
self._application_environment = ApplicationEnvironment()
self._config: dict[Union[type, str], Union[ConfigurationModelABC, str]] = {}
self._argument_types: list[ArgumentABC] = []
self._additional_arguments: list[str] = []
self._argument_error_function: Optional[Callable] = None
self._handled_args = []
@property
def environment(self) -> ApplicationEnvironmentABC:
return self._application_environment
@property
def additional_arguments(self) -> list[str]:
return self._additional_arguments
@property
def argument_error_function(self) -> Optional[Callable]:
return self._argument_error_function
@argument_error_function.setter
def argument_error_function(self, argument_error_function: Callable):
self._argument_error_function = argument_error_function
@property
def arguments(self) -> list[ArgumentABC]:
return self._argument_types
class Configuration:
_config = {}
@staticmethod
def _print_info(message: str):
@@ -72,7 +26,7 @@ class Configuration(ConfigurationABC):
Info message
"""
Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f"[CONF] {message}")
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
@@ -86,7 +40,7 @@ class Configuration(ConfigurationABC):
Warning message
"""
Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.write_line(f"[CONF] {message}")
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
@@ -100,31 +54,11 @@ class Configuration(ConfigurationABC):
Error message
"""
Console.set_foreground_color(ForegroundColorEnum.red)
Console.write_line(f"[CONF] {message}")
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
def _set_variable(self, name: str, value: any):
r"""Sets variable to given value
Parameter:
name: :class:`str`
Name of the variable
value: :class:`any`
Value of the variable
"""
if name == ConfigurationVariableNameEnum.environment.value:
self._application_environment.environment_name = EnvironmentNameEnum(value)
elif name == ConfigurationVariableNameEnum.name.value:
self._application_environment.application_name = value
elif name == ConfigurationVariableNameEnum.customer.value:
self._application_environment.customer = value
else:
self._config[name] = value
def _load_json_file(self, file: str, output: bool) -> dict:
@classmethod
def _load_json_file(cls, file: str, output: bool) -> dict:
r"""Reads the json file
Parameter:
@@ -142,90 +76,19 @@ class Configuration(ConfigurationABC):
# load json
json_cfg = json.load(cfg)
if output:
self._print_info(f"Loaded config file: {file}")
cls._print_info(f"Loaded config file: {file}")
return json_cfg
except Exception as e:
self._print_error(f"Cannot load config file: {file}! -> {e}")
cls._print_error(f"Cannot load config file: {file}! -> {e}")
return {}
def _parse_arguments(
self,
executables: list[ArgumentABC],
arg_list: list[str],
args_types: list[ArgumentABC],
):
for i in range(0, len(arg_list)):
arg_str = arg_list[i]
for n in range(0, len(args_types)):
arg = args_types[n]
arg_str_without_token = arg_str
if arg.token != "" and arg.token in arg_str:
arg_str_without_token = arg_str.split(arg.token)[1]
# executable
if isinstance(arg, ExecutableArgument):
if (
arg_str.startswith(arg.token)
and arg_str_without_token == arg.name
or arg_str_without_token in arg.aliases
):
executables.append(arg)
self._handled_args.append(arg_str)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# variables
elif isinstance(arg, VariableArgument):
arg_str_without_value = arg_str_without_token
if arg.value_token in arg_str_without_value:
arg_str_without_value = arg_str_without_token.split(arg.value_token)[0]
if (
arg_str.startswith(arg.token)
and arg_str_without_value == arg.name
or arg_str_without_value in arg.aliases
):
if arg.value_token != " ":
value = arg_str_without_token.split(arg.value_token)[1]
else:
value = arg_list[i + 1]
self._set_variable(arg.name, value)
self._handled_args.append(arg_str)
self._handled_args.append(value)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# flags
elif isinstance(arg, FlagArgument):
if (
arg_str.startswith(arg.token)
and arg_str_without_token == arg.name
or arg_str_without_token in arg.aliases
):
if arg_str in self._additional_arguments:
self._additional_arguments.remove(arg_str)
self._additional_arguments.append(arg.name)
self._handled_args.append(arg_str)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# add left over values to args
if arg_str not in self._additional_arguments and arg_str not in self._handled_args:
self._additional_arguments.append(arg_str)
def add_environment_variables(self, prefix: str):
for env_var in os.environ.keys():
if not env_var.startswith(prefix):
continue
self._set_variable(env_var.replace(prefix, ""), os.environ[env_var])
def add_console_argument(self, argument: ArgumentABC):
self._argument_types.append(argument)
def add_json_file(self, name: str, optional: bool = None, output: bool = True, path: str = None):
@classmethod
def add_json_file(cls, name: str, optional: bool = None, output: bool = True, path: str = None):
if os.path.isabs(name):
file_path = name
else:
path_root = self._application_environment.working_directory
path_root = Environment.get_cwd()
if path is not None:
path_root = path
@@ -237,122 +100,35 @@ class Configuration(ConfigurationABC):
if not os.path.isfile(file_path):
if optional is not True:
if output:
self._print_error(f"File not found: {file_path}")
cls._print_error(f"File not found: {file_path}")
sys.exit()
if output:
self._print_warn(__name__, f"Not Loaded config file: {file_path}")
cls._print_warn(f"Not Loaded config file: {file_path}")
return None
config_from_file = self._load_json_file(file_path, output)
config_from_file = cls._load_json_file(file_path, output)
for sub in ConfigurationModelABC.__subclasses__():
for key, value in config_from_file.items():
if sub.__name__ != key and sub.__name__.replace("Settings", "") != key:
continue
configuration = sub()
from_dict = getattr(configuration, "from_dict", None)
configuration = JSONProcessor.process(sub, value)
if from_dict is not None and not hasattr(from_dict, "is_base_func"):
Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.write_line(
f"{sub.__name__}.from_dict is deprecated. Instead, set attributes as typed arguments in __init__. They can be None by default!"
)
Console.color_reset()
configuration.from_dict(value)
else:
configuration = JSONProcessor.process(sub, value)
cls.set(sub, configuration)
self.add_configuration(sub, configuration)
@classmethod
def set(cls, key: Any, value: T):
if inspect.isclass(key):
key = key.__name__
def add_configuration(self, key_type: T, value: any):
self._config[key_type] = value
cls._config[key] = value
def create_console_argument(
self,
arg_type: ArgumentTypeEnum,
token: str,
name: str,
aliases: list[str],
*args,
**kwargs,
) -> ArgumentABC:
argument = ArgumentBuilder.build_argument(arg_type, token, name, aliases, *args, **kwargs)
self._argument_types.append(argument)
return argument
@classmethod
def get(cls, key: Any, default: D = None) -> T | D:
if inspect.isclass(key):
key = key.__name__
def for_each_argument(self, call: Callable):
for arg in self._argument_types:
call(arg)
def get_configuration(self, search_type: T) -> Optional[R]:
if type(search_type) is str:
if search_type == ConfigurationVariableNameEnum.environment.value:
return self._application_environment.environment_name
elif search_type == ConfigurationVariableNameEnum.name.value:
return self._application_environment.application_name
elif search_type == ConfigurationVariableNameEnum.customer.value:
return self._application_environment.customer
if search_type not in self._config:
return None
for config_model in self._config:
if config_model == search_type:
return self._config[config_model]
def parse_console_arguments(self, services: ServiceProviderABC, error: bool = None) -> bool:
# sets environment variables as possible arguments as: --VAR=VALUE
for arg_name in ConfigurationVariableNameEnum.to_list():
self.add_console_argument(VariableArgument("--", str(arg_name).upper(), [str(arg_name).lower()], "="))
success = False
try:
arg_list = sys.argv[1:]
executables: list[ExecutableArgument] = []
self._parse_arguments(executables, arg_list, self._argument_types)
except Exception:
Console.error("An error occurred while parsing arguments.", traceback.format_exc())
sys.exit()
try:
prevent = False
for exe in executables:
if prevent:
continue
if exe.validators is not None:
abort = False
for validator_type in exe.validators:
validator = services.get_service(validator_type)
result = validator.validate()
abort = not result
if abort:
break
if abort:
sys.exit()
cmd = services.get_service(exe.executable_type)
self._handle_pre_or_post_executables(True, exe, services)
self._set_variable("ACTIVE_EXECUTABLE", exe.name)
args = self.get_configuration("ARGS")
if args is not None:
for arg in args.split(" "):
if arg == "":
continue
self._additional_arguments.append(arg)
cmd.run(self._additional_arguments)
self._handle_pre_or_post_executables(False, exe, services)
prevent = exe.prevent_next_executable
success = True
except Exception:
Console.error("An error occurred while executing arguments.", traceback.format_exc())
sys.exit()
return success
return cls._config.get(key, default)

View File

@@ -1,140 +0,0 @@
from abc import abstractmethod, ABC
from collections.abc import Callable
from typing import Optional
from cpl.core.configuration.argument_abc import ArgumentABC
from cpl.core.configuration.argument_type_enum import ArgumentTypeEnum
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.typing import T, R
class ConfigurationABC(ABC):
@abstractmethod
def __init__(self):
r"""ABC for the :class:`cpl.core.configuration.configuration.Configuration`"""
@property
@abstractmethod
def environment(self) -> ApplicationEnvironmentABC:
pass
@property
@abstractmethod
def additional_arguments(self) -> list[str]:
pass
@property
@abstractmethod
def argument_error_function(self) -> Optional[Callable]:
pass
@argument_error_function.setter
@abstractmethod
def argument_error_function(self, argument_error_function: Callable):
pass
@property
@abstractmethod
def arguments(self) -> list[ArgumentABC]:
pass
@abstractmethod
def add_environment_variables(self, prefix: str):
r"""Reads the environment variables
Parameter:
prefix: :class:`str`
Prefix of the variables
"""
@abstractmethod
def add_console_argument(self, argument: ArgumentABC):
r"""Adds console argument to known console arguments
Parameter:
argument: :class:`cpl.core.configuration.console_argument.ConsoleArgumentABC`
Specifies the console argument
"""
@abstractmethod
def add_json_file(self, name: str, optional: bool = None, output: bool = True, path: str = None):
r"""Reads and saves settings from given json file
Parameter:
name: :class:`str`
Name of the file
optional: :class:`str`
Specifies whether an error should occur if the file was not found
output: :class:`bool`
Specifies whether an output should take place
path: :class:`str`
Path in which the file should be stored
"""
@abstractmethod
def add_configuration(self, key_type: T, value: any):
r"""Add configuration object
Parameter:
key_type: :class:`cpl.core.type.T`
Type of the value
value: any
Object of the value
"""
@abstractmethod
def create_console_argument(
self, arg_type: ArgumentTypeEnum, token: str, name: str, aliases: list[str], *args, **kwargs
) -> ArgumentABC:
r"""Creates and adds a console argument to known console arguments
Parameter:
token: :class:`str`
Specifies optional beginning of argument
name :class:`str`
Specifies name of argument
aliases list[:class:`str`]
Specifies possible aliases of name
value_token :class:`str`
Specifies were the value begins
is_value_token_optional :class:`bool`
Specifies if values are optional
runnable: :class:`cpl.core.configuration.console_argument.ConsoleArgumentABC`
Specifies class to run when called if value is not None
Returns:
Object of :class:`cpl.core.configuration.console_argument.ConsoleArgumentABC`
"""
@abstractmethod
def for_each_argument(self, call: Callable):
r"""Iterates through all arguments and calls the call function
Parameter:
call: :class:`Callable`
Call for each argument
"""
@abstractmethod
def get_configuration(self, search_type: T) -> Optional[R]:
r"""Returns value from configuration by given type
Parameter:
search_type: :class:`cpl.core.type.T`
Type to search for
Returns:
Object of Union[:class:`str`, :class:`cpl.core.configuration.configuration_model_abc.ConfigurationModelABC`]
"""
@abstractmethod
def parse_console_arguments(self, services: "ServiceProviderABC", error: bool = None) -> bool:
r"""Reads the console arguments
Parameter:
error: :class:`bool`
Defines is invalid argument error will be shown or not
Returns:
Bool to specify if executables were executed or not.
"""

View File

@@ -1,21 +1,5 @@
from abc import ABC, abstractmethod
def base_func(method):
method.is_base_func = True
return method
from abc import ABC
class ConfigurationModelABC(ABC):
@abstractmethod
def __init__(self):
r"""ABC for settings representation"""
@base_func
def from_dict(self, settings: dict):
r"""DEPRECATED: Set attributes as typed arguments in __init__ instead. See https://docs.sh-edraft.de/cpl/deprecated.html#ConfigurationModelABC-from_dict-method for further information
Converts attributes to dict
Parameter:
settings: :class:`dict`
"""
pass

View File

@@ -1,11 +0,0 @@
from enum import Enum
class ConfigurationVariableNameEnum(Enum):
environment = "ENVIRONMENT"
name = "NAME"
customer = "CUSTOMER"
@staticmethod
def to_list():
return [var.value for var in ConfigurationVariableNameEnum]

View File

@@ -1,40 +0,0 @@
from typing import Type, Optional
from cpl.core.configuration.argument_executable_abc import ArgumentExecutableABC
from cpl.core.configuration.argument_abc import ArgumentABC
from cpl.core.configuration.validator_abc import ValidatorABC
class ExecutableArgument(ArgumentABC):
def __init__(
self,
token: str,
name: str,
aliases: list[str],
executable: Type[ArgumentExecutableABC],
prevent_next_executable: bool = False,
validators: list[Type[ValidatorABC]] = None,
console_arguments: list["ArgumentABC"] = None,
):
self._executable_type = executable
self._validators = validators
self._executable: Optional[ArgumentExecutableABC] = None
ArgumentABC.__init__(self, token, name, aliases, prevent_next_executable, console_arguments)
@property
def executable_type(self) -> type:
return self._executable_type
def set_executable(self, executable: ArgumentExecutableABC):
self._executable = executable
@property
def validators(self) -> list[Type[ValidatorABC]]:
return self._validators
def run(self, args: list[str]):
r"""Executes runnable if exists"""
if self._executable is None:
return
self._executable.execute(args)

View File

@@ -1,13 +0,0 @@
from cpl.core.configuration.argument_abc import ArgumentABC
class FlagArgument(ArgumentABC):
def __init__(
self,
token: str,
name: str,
aliases: list[str],
prevent_next_executable: bool = False,
console_arguments: list["ArgumentABC"] = None,
):
ArgumentABC.__init__(self, token, name, aliases, prevent_next_executable, console_arguments)

View File

@@ -1,11 +0,0 @@
from abc import ABC, abstractmethod
class ValidatorABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def validate(self) -> bool:
pass

View File

@@ -1,28 +0,0 @@
from cpl.core.configuration.argument_abc import ArgumentABC
class VariableArgument(ArgumentABC):
def __init__(
self,
token: str,
name: str,
aliases: list[str],
value_token: str,
prevent_next_executable: bool = False,
console_arguments: list["ArgumentABC"] = None,
):
self._value_token = value_token
self._value: str = ""
ArgumentABC.__init__(self, token, name, aliases, prevent_next_executable, console_arguments)
@property
def value_token(self) -> str:
return self._value_token
@property
def value(self) -> str:
return self._value
def set_value(self, value: str):
self._value = value

View File

@@ -1,5 +1,4 @@
from .background_color_enum import BackgroundColorEnum
from .console import Console
from .console_call import ConsoleCall
from ._call import ConsoleCall
from .foreground_color_enum import ForegroundColorEnum
from .spinner_thread import SpinnerThread

View File

@@ -1,7 +1,8 @@
import os
import sys
import threading
import multiprocessing
import time
from multiprocessing import Process
from termcolor import colored
@@ -9,8 +10,8 @@ from cpl.core.console.background_color_enum import BackgroundColorEnum
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
class SpinnerThread(threading.Thread):
r"""Thread to show spinner in terminal
class Spinner(Process):
r"""Process to show spinner in terminal
Parameter:
msg_len: :class:`int`
@@ -22,7 +23,7 @@ class SpinnerThread(threading.Thread):
"""
def __init__(self, msg_len: int, foreground_color: ForegroundColorEnum, background_color: BackgroundColorEnum):
threading.Thread.__init__(self)
Process.__init__(self)
self._msg_len = msg_len
self._foreground_color = foreground_color
@@ -50,29 +51,26 @@ class SpinnerThread(threading.Thread):
return color_args
def run(self) -> None:
r"""Entry point of thread, shows the spinner"""
r"""Entry point of process, shows the spinner"""
columns = 0
if sys.platform == "win32":
columns = os.get_terminal_size().columns
else:
term_rows, term_columns = os.popen("stty size", "r").read().split()
values = os.popen("stty size", "r").read().split()
term_rows, term_columns = values if len(values) == 2 else (0, 0)
columns = int(term_columns)
end_msg = "done"
end_msg_pos = columns - self._msg_len - len(end_msg)
if end_msg_pos > 0:
print(f'{"" : >{end_msg_pos}}', end="")
padding = columns - self._msg_len - len(end_msg)
if padding > 0:
print(f'{"" : >{padding}}', end="")
else:
print("", end="")
first = True
spinner = self._spinner()
while self._is_spinning:
if first:
first = False
print(colored(f"{next(spinner): >{len(end_msg) - 1}}", *self._get_color_args()), end="")
else:
print(colored(f"{next(spinner): >{len(end_msg)}}", *self._get_color_args()), end="")
print(colored(f"{next(spinner): >{len(end_msg)}}", *self._get_color_args()), end="")
time.sleep(0.1)
back = ""
for i in range(0, len(end_msg)):
@@ -84,9 +82,10 @@ class SpinnerThread(threading.Thread):
if not self._exit:
print(colored(end_msg, *self._get_color_args()), end="")
def stop_spinning(self):
def stop(self):
r"""Stops the spinner"""
self._is_spinning = False
super().terminate()
time.sleep(0.1)
def exit(self):

View File

@@ -10,9 +10,9 @@ from tabulate import tabulate
from termcolor import colored
from cpl.core.console.background_color_enum import BackgroundColorEnum
from cpl.core.console.console_call import ConsoleCall
from cpl.core.console._call import ConsoleCall
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.console.spinner_thread import SpinnerThread
from cpl.core.console._spinner import Spinner
class Console:
@@ -464,7 +464,7 @@ class Console:
cls.set_hold_back(True)
spinner = None
if not cls._disabled:
spinner = SpinnerThread(len(message), spinner_foreground_color, spinner_background_color)
spinner = Spinner(len(message), spinner_foreground_color, spinner_background_color)
spinner.start()
return_value = None
@@ -476,7 +476,7 @@ class Console:
cls.close()
if spinner is not None:
spinner.stop_spinning()
spinner.stop()
cls.set_hold_back(False)
cls.set_foreground_color(ForegroundColorEnum.default)

View File

@@ -1,3 +0,0 @@
from .database_settings_name_enum import DatabaseSettingsNameEnum
from .database_settings import DatabaseSettings
from .table_abc import TableABC

View File

@@ -1,2 +0,0 @@
from .database_connection import DatabaseConnection
from .database_connection_abc import DatabaseConnectionABC

View File

@@ -1,2 +0,0 @@
from .database_context import DatabaseContext
from .database_context_abc import DatabaseContextABC

View File

@@ -1,52 +0,0 @@
from typing import Optional
from cpl.core.database.connection.database_connection import DatabaseConnection
from cpl.core.database.connection.database_connection_abc import DatabaseConnectionABC
from cpl.core.database.context.database_context_abc import DatabaseContextABC
from cpl.core.database.database_settings import DatabaseSettings
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseContext(DatabaseContextABC):
r"""Representation of the database context
Parameter:
database_settings: :class:`cpl.core.database.database_settings.DatabaseSettings`
"""
def __init__(self):
DatabaseContextABC.__init__(self)
self._db: DatabaseConnectionABC = DatabaseConnection()
self._settings: Optional[DatabaseSettings] = None
@property
def cursor(self) -> MySQLCursorBuffered:
self._ping_and_reconnect()
return self._db.cursor
def _ping_and_reconnect(self):
try:
self._db.server.ping(reconnect=True, attempts=3, delay=5)
except Exception:
# reconnect your cursor as you did in __init__ or wherever
if self._settings is None:
raise Exception("Call DatabaseContext.connect first")
self.connect(self._settings)
def connect(self, database_settings: DatabaseSettings):
if self._settings is None:
self._settings = database_settings
self._db.connect(database_settings)
self.save_changes()
def save_changes(self):
self._ping_and_reconnect()
self._db.server.commit()
def select(self, statement: str) -> list[tuple]:
self._ping_and_reconnect()
self._db.cursor.execute(statement)
return self._db.cursor.fetchall()

View File

@@ -1,40 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.database.database_settings import DatabaseSettings
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseContextABC(ABC):
r"""ABC for the :class:`cpl.core.database.context.database_context.DatabaseContext`"""
@abstractmethod
def __init__(self, *args):
pass
@property
@abstractmethod
def cursor(self) -> MySQLCursorBuffered:
pass
@abstractmethod
def connect(self, database_settings: DatabaseSettings):
r"""Connects to a database by connection settings
Parameter:
database_settings :class:`cpl.core.database.database_settings.DatabaseSettings`
"""
@abstractmethod
def save_changes(self):
r"""Saves changes of the database"""
@abstractmethod
def select(self, statement: str) -> list[tuple]:
r"""Runs SQL Statements
Parameter:
statement: :class:`str`
Returns:
list: Fetched list of selected elements
"""

View File

@@ -1,13 +0,0 @@
from enum import Enum
class DatabaseSettingsNameEnum(Enum):
host = "Host"
port = "Port"
user = "User"
password = "Password"
database = "Database"
charset = "Charset"
use_unicode = "UseUnicode"
buffered = "Buffered"
auth_plugin = "AuthPlugin"

View File

@@ -1,18 +0,0 @@
from cpl.core.dependency_injection.scope import Scope
from cpl.core.dependency_injection.scope_abc import ScopeABC
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
class ScopeBuilder:
r"""Class to build :class:`cpl.core.dependency_injection.scope.Scope`"""
def __init__(self, service_provider: ServiceProviderABC) -> None:
self._service_provider = service_provider
def build(self) -> ScopeABC:
r"""Returns scope
Returns:
Object of type :class:`cpl.core.dependency_injection.scope.Scope`
"""
return Scope(self._service_provider)

View File

@@ -1,90 +0,0 @@
from abc import abstractmethod, ABC
from typing import Type
from cpl.core.database.context.database_context_abc import DatabaseContextABC
from cpl.core.database.database_settings import DatabaseSettings
from cpl.core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.core.typing import T, Source
class ServiceCollectionABC(ABC):
r"""ABC for the class :class:`cpl.core.dependency_injection.service_collection.ServiceCollection`"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def add_db_context(self, db_context_type: Type[DatabaseContextABC], db_settings: DatabaseSettings):
r"""Adds database context
Parameter:
db_context: Type[:class:`cpl.core.database.context.database_context_abc.DatabaseContextABC`]
Database context
"""
@abstractmethod
def add_logging(self):
r"""Adds the CPL internal logger"""
@abstractmethod
def add_pipes(self):
r"""Adds the CPL internal pipes as transient"""
def add_translation(self):
r"""Adds the CPL translation"""
raise NotImplementedError("You should install and use the cpl-translation package")
def add_mail(self):
r"""Adds the CPL mail"""
raise NotImplementedError("You should install and use the cpl-mail package")
@abstractmethod
def add_transient(self, service_type: T, service: T = None) -> "ServiceCollectionABC":
r"""Adds a service with transient lifetime
Parameter:
service_type: :class:`Type`
Type of the service
service: :class:`Callable`
Object of the service
Returns:
self: :class:`cpl.core.dependency_injection.service_collection_abc.ServiceCollectionABC`
"""
@abstractmethod
def add_scoped(self, service_type: T, service: T = None) -> "ServiceCollectionABC":
r"""Adds a service with scoped lifetime
Parameter:
service_type: :class:`Type`
Type of the service
service: :class:`Callable`
Object of the service
Returns:
self: :class:`cpl.core.dependency_injection.service_collection_abc.ServiceCollectionABC`
"""
@abstractmethod
def add_singleton(self, service_type: T, service: T = None) -> "ServiceCollectionABC":
r"""Adds a service with singleton lifetime
Parameter:
service_type: :class:`Type`
Type of the service
service: :class:`Callable`
Object of the service
Returns:
self: :class:`cpl.core.dependency_injection.service_collection_abc.ServiceCollectionABC`
"""
@abstractmethod
def build_service_provider(self) -> ServiceProviderABC:
r"""Creates instance of the service provider
Returns:
Object of type :class:`cpl.core.dependency_injection.service_provider_abc.ServiceProviderABC`
"""

View File

@@ -1,3 +1,2 @@
from .application_environment_abc import ApplicationEnvironmentABC
from .environment_name_enum import EnvironmentNameEnum
from .application_environment import ApplicationEnvironment
from .environment_enum import EnvironmentEnum
from .environment import Environment

View File

@@ -1,95 +0,0 @@
import os
from datetime import datetime
from socket import gethostname
from typing import Optional
from cpl.core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.core.environment.environment_name_enum import EnvironmentNameEnum
class ApplicationEnvironment(ApplicationEnvironmentABC):
r"""Represents environment of the application
Parameter:
name: :class:`cpl.core.environment.environment_name_enum.EnvironmentNameEnum`
"""
def __init__(self, name: EnvironmentNameEnum = EnvironmentNameEnum.production):
ApplicationEnvironmentABC.__init__(self)
self._environment_name: Optional[EnvironmentNameEnum] = name
self._app_name: Optional[str] = None
self._customer: Optional[str] = None
self._start_time: datetime = datetime.now()
self._end_time: datetime = datetime.now()
self._runtime_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self._working_directory = os.getcwd()
@property
def environment_name(self) -> str:
return str(self._environment_name.value)
@environment_name.setter
def environment_name(self, environment_name: str):
self._environment_name = EnvironmentNameEnum(environment_name)
@property
def application_name(self) -> str:
return self._app_name if self._app_name is not None else ""
@application_name.setter
def application_name(self, application_name: str):
self._app_name = application_name
@property
def customer(self) -> str:
return self._customer if self._customer is not None else ""
@customer.setter
def customer(self, customer: str):
self._customer = customer
@property
def host_name(self):
return gethostname()
@property
def start_time(self) -> datetime:
return self._start_time
@property
def end_time(self) -> datetime:
return self._end_time
@end_time.setter
def end_time(self, end_time: datetime):
self._end_time = end_time
@property
def date_time_now(self) -> datetime:
return datetime.now()
@property
def working_directory(self) -> str:
return str(self._working_directory)
@property
def runtime_directory(self) -> str:
return str(self._runtime_directory)
def set_runtime_directory(self, runtime_directory: str):
if runtime_directory != "":
self._runtime_directory = runtime_directory
return
self._runtime_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def set_working_directory(self, working_directory: str):
if working_directory != "":
self._working_directory = working_directory
os.chdir(self._working_directory)
return
self._working_directory = os.path.abspath("./")
os.chdir(self._working_directory)

View File

@@ -1,98 +0,0 @@
from abc import ABC, abstractmethod
from datetime import datetime
class ApplicationEnvironmentABC(ABC):
r"""ABC of the class :class:`cpl.core.environment.application_environment.ApplicationEnvironment`"""
@abstractmethod
def __init__(self):
pass
@property
@abstractmethod
def environment_name(self) -> str:
pass
@environment_name.setter
@abstractmethod
def environment_name(self, environment_name: str):
pass
@property
@abstractmethod
def application_name(self) -> str:
pass
@application_name.setter
@abstractmethod
def application_name(self, application_name: str):
pass
@property
@abstractmethod
def customer(self) -> str:
pass
@customer.setter
@abstractmethod
def customer(self, customer: str):
pass
@property
@abstractmethod
def host_name(self) -> str:
pass
@property
@abstractmethod
def start_time(self) -> datetime:
pass
@start_time.setter
@abstractmethod
def start_time(self, start_time: datetime):
pass
@property
@abstractmethod
def end_time(self):
pass
@end_time.setter
@abstractmethod
def end_time(self, end_time: datetime):
pass
@property
@abstractmethod
def date_time_now(self) -> datetime:
pass
@property
@abstractmethod
def working_directory(self) -> str:
pass
@property
@abstractmethod
def runtime_directory(self) -> str:
pass
@abstractmethod
def set_runtime_directory(self, runtime_directory: str):
r"""Sets the current runtime directory
Parameter:
runtime_directory: :class:`str`
Path of the runtime directory
"""
@abstractmethod
def set_working_directory(self, working_directory: str):
r"""Sets the current working directory
Parameter:
working_directory: :class:`str`
Path of the current working directory
"""

View File

@@ -1,22 +1,58 @@
import os
from socket import gethostname
from typing import Optional, Type
from cpl.core.environment.environment_enum import EnvironmentEnum
from cpl.core.typing import T
from cpl.core.utils.get_value import get_value
class Environment:
_environment = "production"
r"""Represents environment of the application
Parameter:
name: :class:`cpl.core.environment.environment_name_enum.EnvironmentNameEnum`
"""
@classmethod
def get_environment(cls):
return cls._environment
return cls.get("ENVIRONMENT", str, EnvironmentEnum.production.value)
@classmethod
def set_environment(cls, environment: str):
if environment not in ["development", "staging", "production"]:
raise ValueError("Invalid environment")
Environment._environment = environment
assert environment is not None and environment != "", "environment must not be None or empty"
assert environment.lower() in [
e.value for e in EnvironmentEnum
], f"environment must be one of {[e.value for e in EnvironmentEnum]}"
cls.set("ENVIRONMENT", environment.lower())
@classmethod
def get_app_name(cls) -> str:
return cls.get("APP_NAME", str)
@classmethod
def set_app_name(cls, app_name: str):
cls.set("APP_NAME", app_name)
@staticmethod
def get_host_name() -> str:
return gethostname()
@staticmethod
def get_cwd() -> str:
return os.getcwd()
@staticmethod
def set_cwd(working_directory: str):
assert working_directory is not None and working_directory != "", "working_directory must not be None or empty"
os.chdir(working_directory)
@staticmethod
def set(key: str, value: T):
assert key is not None and key != "", "key must not be None or empty"
os.environ[key] = str(value)
@staticmethod
def get(key: str, cast_type: Type[T], default: Optional[T] = None) -> Optional[T]:

View File

@@ -1,7 +1,7 @@
from enum import Enum
class EnvironmentNameEnum(Enum):
class EnvironmentEnum(Enum):
production = "production"
staging = "staging"
testing = "testing"

View File

@@ -1,94 +0,0 @@
import multiprocessing
import os
from datetime import datetime
from typing import Self
from cpl.core.console import Console
from cpl.core.log.log_level_enum import LogLevelEnum
class LogWriter:
_instance = None
# ANSI color codes for different log levels
_COLORS = {
LogLevelEnum.trace: "\033[37m", # Light Gray
LogLevelEnum.debug: "\033[94m", # Blue
LogLevelEnum.info: "\033[92m", # Green
LogLevelEnum.warning: "\033[93m", # Yellow
LogLevelEnum.error: "\033[91m", # Red
LogLevelEnum.fatal: "\033[95m", # Magenta
}
def __init__(self, file_prefix: str, level: LogLevelEnum = LogLevelEnum.info):
self._file_prefix = file_prefix
self._level = level
self._queue = multiprocessing.Queue()
self._process = multiprocessing.Process(target=self._log_worker, daemon=True)
self._create_log_dir()
self._process.start()
@property
def level(self) -> LogLevelEnum:
return self._level
@level.setter
def level(self, value: LogLevelEnum):
assert isinstance(value, LogLevelEnum), "Log level must be an instance of LogLevelEnum"
self._level = value
@classmethod
def get_instance(cls, file_prefix: str, level: LogLevelEnum = LogLevelEnum.info) -> Self:
if cls._instance is None:
cls._instance = LogWriter(file_prefix, level)
return cls._instance
@staticmethod
def _create_log_dir():
if os.path.exists("logs"):
return
os.makedirs("logs")
def _log_worker(self):
"""Worker process that writes log messages from the queue to the file."""
while True:
content = self._queue.get()
if content is None: # Shutdown signal
break
self._write_log_to_file(content)
Console.write_line(
f"{self._COLORS.get(self._level, '\033[0m')}{content}\033[0m"
)
@property
def log_file(self):
return f"logs/{self._file_prefix}_{datetime.now().strftime('%Y-%m-%d')}.log"
def _ensure_file_size(self):
log_file = self.log_file
if not os.path.exists(log_file) or os.path.getsize(log_file) <= 0.5 * 1024 * 1024:
return
# if exists and size is greater than 300MB, create a new file
os.rename(
log_file,
f"{log_file.split('.log')[0]}_{datetime.now().strftime('%H-%M-%S')}.log",
)
def _write_log_to_file(self, content: str):
self._ensure_file_size()
with open(self.log_file, "a") as log_file:
log_file.write(content + "\n")
log_file.close()
def log(self, content: str):
"""Enqueue log message without blocking main app."""
self._queue.put(content)
def close(self):
"""Gracefully stop the logging process."""
self._queue.put(None)
self._process.join()

View File

@@ -3,7 +3,6 @@ import traceback
from datetime import datetime
from cpl.core.console import Console
from cpl.core.log._log_writer import LogWriter
from cpl.core.log.log_level_enum import LogLevelEnum
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.typing import Messages, Source
@@ -13,6 +12,16 @@ class Logger(LoggerABC):
_level = LogLevelEnum.info
_levels = [x for x in LogLevelEnum]
# ANSI color codes for different log levels
_COLORS = {
LogLevelEnum.trace: "\033[37m", # Light Gray
LogLevelEnum.debug: "\033[94m", # Blue
LogLevelEnum.info: "\033[92m", # Green
LogLevelEnum.warning: "\033[93m", # Yellow
LogLevelEnum.error: "\033[91m", # Red
LogLevelEnum.fatal: "\033[95m", # Magenta
}
def __init__(self, source: Source, file_prefix: str = None):
LoggerABC.__init__(self)
assert source is not None and source != "", "Source cannot be None or empty"
@@ -22,7 +31,18 @@ class Logger(LoggerABC):
file_prefix = "app"
self._file_prefix = file_prefix
self._writer = LogWriter.get_instance(self._file_prefix)
self._create_log_dir()
@property
def log_file(self):
return f"logs/{self._file_prefix}_{datetime.now().strftime('%Y-%m-%d')}.log"
@staticmethod
def _create_log_dir():
if os.path.exists("logs"):
return
os.makedirs("logs")
@classmethod
def set_level(cls, level: LogLevelEnum):
@@ -31,6 +51,24 @@ class Logger(LoggerABC):
else:
raise ValueError(f"Invalid log level: {level}")
@staticmethod
def _ensure_file_size(log_file: str):
if not os.path.exists(log_file) or os.path.getsize(log_file) <= 0.5 * 1024 * 1024:
return
# if exists and size is greater than 300MB, create a new file
os.rename(
log_file,
f"{log_file.split('.log')[0]}_{datetime.now().strftime('%H-%M-%S')}.log",
)
def _write_log_to_file(self, content: str):
file = self.log_file
self._ensure_file_size(file)
with open(file, "a") as log_file:
log_file.write(content + "\n")
log_file.close()
def _log(self, level: LogLevelEnum, *messages: Messages):
try:
if self._levels.index(level) < self._levels.index(self._level):
@@ -39,7 +77,8 @@ class Logger(LoggerABC):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
formatted_message = self._format_message(level.value, timestamp, *messages)
self._writer.log(formatted_message)
self._write_log_to_file(formatted_message)
Console.write_line(f"{self._COLORS.get(self._level, '\033[0m')}{formatted_message}\033[0m")
except Exception as e:
print(f"Error while logging: {e} -> {traceback.format_exc()}")

View File

@@ -1,9 +1,13 @@
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T
class BoolPipe(PipeABC):
def __init__(self):
pass
class BoolPipe[bool](PipeABC):
def transform(self, value: bool, *args):
return "True" if value else "False"
@staticmethod
def to_str(value: T, *args):
return str(value).lower()
@staticmethod
def from_str(value: str, *args) -> T:
return value in ("True", "true", "1", "yes", "y", "Y")

View File

@@ -1,20 +1,19 @@
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T
class IPAddressPipe(PipeABC):
def __init__(self):
pass
def transform(self, value: list[int], *args):
class IPAddressPipe[list](PipeABC):
@staticmethod
def to_str(value: T, *args) -> str:
string = ""
if len(value) != 4:
raise Exception("Invalid IP")
raise ValueError("Invalid IP")
for i in range(0, len(value)):
byte = value[i]
if byte > 255 or byte < 0:
raise Exception("Invalid IP")
if not 0 <= byte <= 255:
raise ValueError("Invalid IP")
if i == len(value) - 1:
string += f"{byte}"
@@ -22,3 +21,18 @@ class IPAddressPipe(PipeABC):
string += f"{byte}."
return string
@staticmethod
def from_str(value: str, *args) -> T:
parts = value.split(".")
if len(parts) != 4:
raise Exception("Invalid IP")
result = []
for part in parts:
byte = int(part)
if not 0 <= byte <= 255:
raise Exception("Invalid IP")
result.append(byte)
return result

View File

@@ -1,11 +1,16 @@
from abc import ABC, abstractmethod
from typing import Generic
from cpl.core.typing import T
class PipeABC(ABC):
class PipeABC(ABC, Generic[T]):
@staticmethod
@abstractmethod
def __init__(self):
def to_str(value: T, *args) -> str:
pass
@staticmethod
@abstractmethod
def transform(self, value: any, *args):
def from_str(value: str, *args) -> T:
pass

View File

@@ -1,17 +0,0 @@
from cpl_cli.configuration import VersionSettingsNameEnum
from cpl.core.pipes.pipe_abc import PipeABC
class VersionPipe(PipeABC):
def __init__(self):
pass
def transform(self, value: dict, *args):
for atr in VersionSettingsNameEnum:
if atr.value not in value:
raise KeyError(atr.value)
v_str = f"{value[VersionSettingsNameEnum.major.value]}.{value[VersionSettingsNameEnum.minor.value]}"
if value[VersionSettingsNameEnum.micro.value] is not None:
v_str += f".{value[VersionSettingsNameEnum.micro.value]}"
return v_str

View File

@@ -1,9 +1,16 @@
from typing import TypeVar, Any
from uuid import UUID
T = TypeVar("T")
D = TypeVar("D")
R = TypeVar("R")
Service = TypeVar("Service")
Source = TypeVar("Source")
Messages = list[Any] | Any
UuidId = str | UUID
SerialId = int
Id = UuidId | SerialId

View File

@@ -1,3 +1,5 @@
from .base64 import Base64
from .credential_manager import CredentialManager
from .string import String
from .json_processor import JSONProcessor
from .pip import Pip
from .string import String

View File

@@ -0,0 +1,43 @@
import base64
from typing import Union
class Base64:
@staticmethod
def encode(string: str) -> str:
"""
Encode a string with base64
:param string:
:return:
"""
return base64.b64encode(string.encode("utf-8")).decode("utf-8")
@staticmethod
def decode(string: str) -> str:
"""
Decode a string with base64
:param string:
:return:
"""
return base64.b64decode(string).decode("utf-8")
@staticmethod
def is_b64(sb: Union[str, bytes]) -> bool:
"""
Check if a string is base64 encoded
:param Union[str, bytes] sb:
:return:
:rtype: bool
"""
try:
if isinstance(sb, str):
# If there's any unicode here, an exception will be thrown and the function will return false
sb_bytes = bytes(sb, "ascii")
elif isinstance(sb, bytes):
sb_bytes = sb
else:
raise ValueError("Argument must be string or bytes")
return base64.b64encode(base64.b64decode(sb_bytes)) == sb_bytes
except ValueError:
return False

View File

@@ -40,16 +40,9 @@ def get_value(
if cast_type == bool:
return value.lower() in ["true", "1"]
if (
cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__
) == list:
if (
not (value.startswith("[") and value.endswith("]"))
and list_delimiter not in value
):
raise ValueError(
"List values must be enclosed in square brackets or use a delimiter."
)
if (cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__) == list:
if not (value.startswith("[") and value.endswith("]")) and list_delimiter not in value:
raise ValueError("List values must be enclosed in square brackets or use a delimiter.")
if value.startswith("[") and value.endswith("]"):
value = value[1:-1]

View File

@@ -1,7 +1,7 @@
import enum
from inspect import signature, Parameter
from cpl.core.utils import String
from cpl.core.utils.string import String
class JSONProcessor:
@@ -16,7 +16,7 @@ class JSONProcessor:
if parameter.name == "self" or parameter.annotation == Parameter.empty:
continue
name = String.first_to_upper(String.convert_to_camel_case(parameter.name))
name = String.first_to_upper(String.to_camel_case(parameter.name))
name_first_lower = String.first_to_lower(name)
if name in values or name_first_lower in values or name.upper() in values:
value = ""

View File

@@ -1,13 +1,13 @@
import random
import re
import string
import random
class String:
r"""Useful functions for strings"""
@staticmethod
def convert_to_camel_case(chars: str) -> str:
def to_camel_case(s: str) -> str:
r"""Converts string to camel case
Parameter:
@@ -17,16 +17,10 @@ class String:
Returns:
String converted to CamelCase
"""
converted_name = chars
char_set = string.punctuation + " "
for char in char_set:
if char in converted_name:
converted_name = "".join(word.title() for word in converted_name.split(char))
return converted_name
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
@staticmethod
def convert_to_snake_case(chars: str) -> str:
def to_snake_case(chars: str) -> str:
r"""Converts string to snake case
Parameter:
@@ -56,7 +50,7 @@ class String:
return re.sub(pattern2, r"\1_\2", file_name).lower()
@staticmethod
def first_to_upper(chars: str) -> str:
def first_to_upper(s: str) -> str:
r"""Converts first char to upper
Parameter:
@@ -66,10 +60,10 @@ class String:
Returns:
String with first char as upper
"""
return f"{chars[0].upper()}{chars[1:]}"
return s[0].upper() + s[1:] if s else s
@staticmethod
def first_to_lower(chars: str) -> str:
def first_to_lower(s: str) -> str:
r"""Converts first char to lower
Parameter:
@@ -79,14 +73,24 @@ class String:
Returns:
String with first char as lower
"""
return f"{chars[0].lower()}{chars[1:]}"
return s[0].lower() + s[1:] if s else s
@staticmethod
def random_string(chars: str, length: int) -> str:
def random(length: int, letters=True, digits=False, special_characters=False) -> str:
r"""Creates random string by given chars and length
Returns:
String of random chars
"""
return "".join(random.choice(chars) for _ in range(length))
characters = []
if letters:
characters.append(string.ascii_letters)
if digits:
characters.append(string.digits)
if special_characters:
characters.append(string.punctuation)
return "".join(random.choice(characters) for _ in range(length)) if characters else ""

View File

@@ -3,3 +3,4 @@ colorama==0.4.6
tabulate==0.9.0
termcolor==3.1.0
mysql-connector-python==9.4.0
pynput==1.8.1

View File

@@ -0,0 +1,41 @@
from typing import Type
from cpl.dependency import ServiceCollection as _ServiceCollection
from . import mysql as _mysql
from . import postgres as _postgres
from .internal_tables import InternalTables
def _add(collection: _ServiceCollection,db_context: Type, default_port: int, server_type: str):
from cpl.core.console import Console
from cpl.core.configuration import Configuration
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.model.server_type import ServerTypes, ServerType
from cpl.database.model.database_settings import DatabaseSettings
from cpl.database.service.migration_service import MigrationService
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
try:
ServerType.set_server_type(ServerTypes(server_type))
Configuration.set("DB_DEFAULT_PORT", default_port)
collection.add_singleton(DBContextABC, db_context)
collection.add_singleton(ExecutedMigrationDao)
collection.add_singleton(MigrationService)
except ImportError as e:
Console.error("cpl-database is not installed", str(e))
def add_mysql(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 3306, ServerTypes.MYSQL.value)
def add_postgres(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 5432, ServerTypes.POSTGRES.value)
_ServiceCollection.with_module(add_mysql, _mysql.__name__)
_ServiceCollection.with_module(add_postgres, _postgres.__name__)

View File

@@ -0,0 +1,68 @@
import textwrap
from typing import Callable
class ExternalDataTempTableBuilder:
def __init__(self):
self._table_name = None
self._fields: dict[str, str] = {}
self._primary_key = "id"
self._join_ref_table = None
self._value_getter = None
@property
def table_name(self) -> str:
return self._table_name
@property
def fields(self) -> dict[str, str]:
return self._fields
@property
def primary_key(self) -> str:
return self._primary_key
@property
def join_ref_table(self) -> str:
return self._join_ref_table
def with_table_name(self, table_name: str) -> "ExternalDataTempTableBuilder":
self._join_ref_table = table_name
if "." in table_name:
table_name = table_name.split(".")[-1]
if not table_name.endswith("_temp"):
table_name = f"{table_name}_temp"
self._table_name = table_name
return self
def with_field(self, name: str, sql_type: str, primary=False) -> "ExternalDataTempTableBuilder":
if primary:
sql_type += " PRIMARY KEY"
self._primary_key = name
self._fields[name] = sql_type
return self
def with_value_getter(self, value_getter: Callable) -> "ExternalDataTempTableBuilder":
self._value_getter = value_getter
return self
async def build(self) -> str:
assert self._table_name is not None, "Table name is required"
assert self._value_getter is not None, "Value getter is required"
values_str = ", ".join([f"{value}" for value in await self._value_getter()])
return textwrap.dedent(
f"""
DROP TABLE IF EXISTS {self._table_name};
CREATE TEMP TABLE {self._table_name} (
{", ".join([f"{k} {v}" for k, v in self._fields.items()])}
);
INSERT INTO {self._table_name} VALUES {values_str};
"""
)

View File

@@ -0,0 +1,5 @@
from .connection_abc import ConnectionABC
from .db_context_abc import DBContextABC
from .db_join_model_abc import DbJoinModelABC
from .db_model_abc import DbModelABC
from .db_model_dao_abc import DbModelDaoABC

View File

@@ -1,12 +1,12 @@
from abc import ABC, abstractmethod
from cpl.core.database.database_settings import DatabaseSettings
from cpl.database.model.database_settings import DatabaseSettings
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseConnectionABC(ABC):
r"""ABC for the :class:`cpl.core.database.connection.database_connection.DatabaseConnection`"""
class ConnectionABC(ABC):
r"""ABC for the :class:`cpl.database.connection.database_connection.DatabaseConnection`"""
@abstractmethod
def __init__(self):

View File

@@ -0,0 +1,875 @@
import datetime
from abc import ABC, abstractmethod
from enum import Enum
from types import NoneType
from typing import Generic, Optional, Union, Type, List, Any
from cpl.core.typing import T, Id
from cpl.core.utils import String
from cpl.core.utils.get_value import get_value
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.const import DATETIME_FORMAT
from cpl.database.db_logger import DBLogger
from cpl.database.postgres.sql_select_builder import SQLSelectBuilder
from cpl.database.typing import T_DBM, Attribute, AttributeFilters, AttributeSorts
class DataAccessObjectABC(ABC, Generic[T_DBM]):
@abstractmethod
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
from cpl.dependency.service_provider_abc import ServiceProviderABC
self._db = ServiceProviderABC.get_global_provider().get_service(DBContextABC)
self._logger = DBLogger(source)
self._model_type = model_type
self._table_name = table_name
self._logger = DBLogger(source)
self._model_type = model_type
self._table_name = table_name
self._default_filter_condition = None
self.__attributes: dict[str, type] = {}
self.__db_names: dict[str, str] = {}
self.__foreign_tables: dict[str, tuple[str, str]] = {}
self.__foreign_table_keys: dict[str, str] = {}
self.__foreign_dao: dict[str, "DataAccessObjectABC"] = {}
self.__date_attributes: set[str] = set()
self.__ignored_attributes: set[str] = set()
self.__primary_key = "id"
self.__primary_key_type = int
self._external_fields: dict[str, ExternalDataTempTableBuilder] = {}
@property
def table_name(self) -> str:
return self._table_name
def has_attribute(self, attr_name: Attribute) -> bool:
"""
Check if the attribute exists in the DAO
:param Attribute attr_name: Name of the attribute
:return: True if the attribute exists, False otherwise
"""
return attr_name in self.__attributes
def attribute(
self,
attr_name: Attribute,
attr_type: type,
db_name: str = None,
ignore=False,
primary_key=False,
aliases: list[str] = None,
):
"""
Add an attribute for db and object mapping to the data access object
:param Attribute attr_name: Name of the attribute in the object
:param type attr_type: Python type of the attribute to cast db value to
:param str db_name: Name of the field in the database, if None the attribute lowered attr_name without "_" is used
:param bool ignore: Defines if field is ignored for create and update (for e.g. auto increment fields or created/updated fields)
:param bool primary_key: Defines if field is the primary key
:param list[str] aliases: List of aliases for the attribute name
:return:
"""
if isinstance(attr_name, property):
attr_name = attr_name.fget.__name__
self.__attributes[attr_name] = attr_type
if ignore:
self.__ignored_attributes.add(attr_name)
if not db_name:
db_name = attr_name.lower().replace("_", "")
self.__db_names[attr_name] = db_name
self.__db_names[db_name] = db_name
if aliases is not None:
for alias in aliases:
if alias in self.__db_names:
raise ValueError(f"Alias {alias} already exists")
self.__db_names[alias] = db_name
if primary_key:
self.__primary_key = db_name
self.__primary_key_type = attr_type
if attr_type in [datetime, datetime.datetime]:
self.__date_attributes.add(attr_name)
self.__date_attributes.add(db_name)
def reference(
self,
attr: Attribute,
primary_attr: Attribute,
foreign_attr: Attribute,
table_name: str,
reference_dao: "DataAccessObjectABC" = None,
):
"""
Add a reference to another table for the given attribute
:param Attribute attr: Name of the attribute in the object
:param str primary_attr: Name of the primary key in the foreign object
:param str foreign_attr: Name of the foreign key in the object
:param str table_name: Name of the table to reference
:param DataAccessObjectABC reference_dao: The data access object for the referenced table
:return:
"""
if isinstance(attr, property):
attr = attr.fget.__name__
if isinstance(primary_attr, property):
primary_attr = primary_attr.fget.__name__
primary_attr = primary_attr.lower().replace("_", "")
if isinstance(foreign_attr, property):
foreign_attr = foreign_attr.fget.__name__
foreign_attr = foreign_attr.lower().replace("_", "")
self.__foreign_table_keys[attr] = foreign_attr
if reference_dao is not None:
self.__foreign_dao[attr] = reference_dao
if table_name == self._table_name:
return
self.__foreign_tables[attr] = (
table_name,
f"{table_name}.{primary_attr} = {self._table_name}.{foreign_attr}",
)
def use_external_fields(self, builder: ExternalDataTempTableBuilder):
self._external_fields[builder.table_name] = builder
def to_object(self, result: dict) -> T_DBM:
"""
Convert a result from the database to an object
:param dict result: Result from the database
:return:
"""
value_map: dict[str, T] = {}
for db_name, value in result.items():
# Find the attribute name corresponding to the db_name
attr_name = next((k for k, v in self.__db_names.items() if v == db_name), None)
if attr_name:
value_map[attr_name] = self._get_value_from_sql(self.__attributes[attr_name], value)
return self._model_type(**value_map)
def to_dict(self, obj: T_DBM) -> dict:
"""
Convert an object to a dictionary
:param T_DBM obj: Object to convert
:return:
"""
value_map: dict[str, Any] = {}
for attr_name, attr_type in self.__attributes.items():
value = getattr(obj, attr_name)
if isinstance(value, datetime.datetime):
value = value.strftime(DATETIME_FORMAT)
elif isinstance(value, Enum):
value = value.value
value_map[attr_name] = value
for ex_fname in self._external_fields:
ex_field = self._external_fields[ex_fname]
for ex_attr in ex_field.fields:
if ex_attr == self.__primary_key:
continue
value_map[ex_attr] = getattr(obj, ex_attr, None)
return value_map
async def count(self, filters: AttributeFilters = None) -> int:
result = await self._prepare_query(filters=filters, for_count=True)
return result[0]["count"] if result else 0
async def get_history(
self,
entry_id: int,
by_key: str = None,
when: datetime = None,
until: datetime = None,
without_deleted: bool = False,
) -> list[T_DBM]:
"""
Retrieve the history of an entry from the history table.
:param entry_id: The ID of the entry to retrieve history for.
:param by_key: The key to filter by (default is the primary key).
:param when: A specific timestamp to filter the history.
:param until: A timestamp to filter history entries up to a certain point.
:param without_deleted: Exclude deleted entries if True.
:return: A list of historical entries as objects.
"""
f_tables = list(self.__foreign_tables.keys())
history_table = f"{self._table_name}_history"
builder = SQLSelectBuilder(history_table, self.__primary_key)
builder.with_attribute("*")
builder.with_value_condition(
f"{history_table}.{by_key or self.__primary_key}",
"=",
str(entry_id),
f_tables,
)
if self._default_filter_condition:
builder.with_condition(self._default_filter_condition, "", f_tables)
if without_deleted:
builder.with_value_condition(f"{history_table}.deleted", "=", "false", f_tables)
if when:
builder.with_value_condition(
self._attr_from_date_to_char(f"{history_table}.updated"),
"=",
f"'{when.strftime(DATETIME_FORMAT)}'",
f_tables,
)
if until:
builder.with_value_condition(
self._attr_from_date_to_char(f"{history_table}.updated"),
"<=",
f"'{until.strftime(DATETIME_FORMAT)}'",
f_tables,
)
builder.with_order_by(f"{history_table}.updated", "DESC")
query = await builder.build()
result = await self._db.select_map(query)
return [self.to_object(x) for x in result] if result else []
async def get_all(self) -> List[T_DBM]:
result = await self._prepare_query(sorts=[{self.__primary_key: "asc"}])
return [self.to_object(x) for x in result] if result else []
async def get_by_id(self, id: Union[int, str]) -> Optional[T_DBM]:
result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}])
return self.to_object(result[0]) if result else None
async def find_by_id(self, id: Union[int, str]) -> Optional[T_DBM]:
result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}])
return self.to_object(result[0]) if result else None
async def get_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> list[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
if not result or len(result) == 0:
raise ValueError("No result found")
return [self.to_object(x) for x in result] if result else []
async def get_single_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> T_DBM:
result = await self._prepare_query(filters, sorts, take, skip)
if not result:
raise ValueError("No result found")
if len(result) > 1:
raise ValueError("More than one result found")
return self.to_object(result[0])
async def find_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> list[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
return [self.to_object(x) for x in result] if result else []
async def find_single_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> Optional[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
if len(result) > 1:
raise ValueError("More than one result found")
return self.to_object(result[0]) if result else None
async def touch(self, obj: T_DBM):
"""
Touch the entry to update the last updated date
:return:
"""
await self._db.execute(
f"""
UPDATE {self._table_name}
SET updated = NOW()
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
)
async def touch_many_by_id(self, ids: list[Id]):
"""
Touch the entries to update the last updated date
:return:
"""
if len(ids) == 0:
return
await self._db.execute(
f"""
UPDATE {self._table_name}
SET updated = NOW()
WHERE {self.__primary_key} IN ({", ".join([str(x) for x in ids])});
"""
)
async def _build_create_statement(self, obj: T_DBM, skip_editor=False) -> str:
allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes]
fields = ", ".join([self.__db_names[x] for x in allowed_fields])
fields = f"{'EditorId' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}"
values = ", ".join([self._get_value_sql(getattr(obj, x)) for x in allowed_fields])
values = f"{await self._get_editor_id(obj) if not skip_editor else ''}{f', {values}' if not skip_editor and len(values) > 0 else f'{values}'}"
return f"""
INSERT INTO {self._table_name} (
{fields}
) VALUES (
{values}
)
RETURNING {self.__primary_key};
"""
async def create(self, obj: T_DBM, skip_editor=False) -> int:
self._logger.debug(f"create {type(obj).__name__} {obj.__dict__}")
result = await self._db.execute(await self._build_create_statement(obj, skip_editor))
return self._get_value_from_sql(self.__primary_key_type, result[0][0])
async def create_many(self, objs: list[T_DBM], skip_editor=False) -> list[int]:
if len(objs) == 0:
return []
self._logger.debug(f"create many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_create_statement(obj, skip_editor)
result = await self._db.execute(query)
return [self._get_value_from_sql(self.__primary_key_type, x[0]) for x in result]
async def _build_update_statement(self, obj: T_DBM, skip_editor=False) -> str:
allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes]
fields = ", ".join(
[f"{self.__db_names[x]} = {self._get_value_sql(getattr(obj, x, None))}" for x in allowed_fields]
)
fields = f"{f'EditorId = {await self._get_editor_id(obj)}' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}"
return f"""
UPDATE {self._table_name}
SET {fields}
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def update(self, obj: T_DBM, skip_editor=False):
self._logger.debug(f"update {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_update_statement(obj, skip_editor))
async def update_many(self, objs: list[T_DBM], skip_editor=False):
if len(objs) == 0:
return
self._logger.debug(f"update many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_update_statement(obj, skip_editor)
await self._db.execute(query)
async def _build_delete_statement(self, obj: T_DBM, hard_delete: bool = False) -> str:
if hard_delete:
return f"""
DELETE FROM {self._table_name}
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
return f"""
UPDATE {self._table_name}
SET EditorId = {await self._get_editor_id(obj)},
Deleted = true
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def delete(self, obj: T_DBM, hard_delete: bool = False):
self._logger.debug(f"delete {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_delete_statement(obj, hard_delete))
async def delete_many(self, objs: list[T_DBM], hard_delete: bool = False):
if len(objs) == 0:
return
self._logger.debug(f"delete many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_delete_statement(obj, hard_delete)
await self._db.execute(query)
async def _build_restore_statement(self, obj: T_DBM) -> str:
return f"""
UPDATE {self._table_name}
SET EditorId = {await self._get_editor_id(obj)},
Deleted = false
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def restore(self, obj: T_DBM):
self._logger.debug(f"restore {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_restore_statement(obj))
async def restore_many(self, objs: list[T_DBM]):
if len(objs) == 0:
return
self._logger.debug(f"restore many {type(objs[0]).__name__} {len(objs)} {objs[0].__dict__}")
query = ""
for obj in objs:
query += await self._build_restore_statement(obj)
await self._db.execute(query)
async def _prepare_query(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
for_count=False,
) -> list[dict]:
"""
Prepares and executes a query using the SQLBuilder with the given parameters.
:param filters: Conditions to filter the query.
:param sorts: Sorting attributes and directions.
:param take: Limit the number of results.
:param skip: Offset the results.
:return: Query result as a list of dictionaries.
"""
external_table_deps = []
builder = SQLSelectBuilder(self._table_name, self.__primary_key)
for temp in self._external_fields:
builder.with_temp_table(self._external_fields[temp])
if for_count:
builder.with_attribute("COUNT(*)", ignore_table_name=True)
else:
builder.with_attribute("*")
for attr in self.__foreign_tables:
table, join_condition = self.__foreign_tables[attr]
builder.with_left_join(table, join_condition)
if filters:
await self._build_conditions(builder, filters, external_table_deps)
if sorts:
self._build_sorts(builder, sorts, external_table_deps)
if take:
builder.with_limit(take)
if skip:
builder.with_offset(skip)
for external_table in external_table_deps:
builder.use_temp_table(external_table)
query = await builder.build()
return await self._db.select_map(query)
async def _build_conditions(
self,
builder: SQLSelectBuilder,
filters: AttributeFilters,
external_table_deps: list[str],
):
"""
Builds SQL conditions from GraphQL-like filters and adds them to the SQLBuilder.
:param builder: The SQLBuilder instance to add conditions to.
:param filters: GraphQL-like filter structure.
:param external_table_deps: List to store external table dependencies.
"""
if not isinstance(filters, list):
filters = [filters]
for filter_group in filters:
sql_conditions = self._graphql_to_sql_conditions(filter_group, external_table_deps)
for attr, operator, value in sql_conditions:
if attr in self.__foreign_table_keys:
attr = self.__foreign_table_keys[attr]
recursive_join = self._get_recursive_reference_join(attr)
if recursive_join is not None:
builder.with_left_join(*recursive_join)
external_table = self._get_external_field_key(attr)
if external_table is not None:
external_table_deps.append(external_table)
if operator == "fuzzy":
builder.with_levenshtein_condition(attr)
elif operator in [
"IS NULL",
"IS NOT NULL",
]: # operator without value
builder.with_condition(
attr,
operator,
[
x[0]
for fdao in self.__foreign_dao
for x in self.__foreign_dao[fdao].__foreign_tables.values()
],
)
else:
if attr in self.__date_attributes or String.to_snake_case(attr) in self.__date_attributes:
attr = self._attr_from_date_to_char(f"{self._table_name}.{attr}")
builder.with_value_condition(
attr,
operator,
self._get_value_sql(value),
[
x[0]
for fdao in self.__foreign_dao
for x in self.__foreign_dao[fdao].__foreign_tables.values()
],
)
def _graphql_to_sql_conditions(
self, graphql_structure: dict, external_table_deps: list[str]
) -> list[tuple[str, str, Any]]:
"""
Converts a GraphQL-like structure to SQL conditions.
:param graphql_structure: The GraphQL-like filter structure.
:param external_table_deps: List to track external table dependencies.
:return: A list of tuples (attribute, operator, value).
"""
operators = {
"equal": "=",
"notEqual": "!=",
"greater": ">",
"greaterOrEqual": ">=",
"less": "<",
"lessOrEqual": "<=",
"isNull": "IS NULL",
"isNotNull": "IS NOT NULL",
"contains": "LIKE", # Special handling in _graphql_to_sql_conditions
"notContains": "NOT LIKE", # Special handling in _graphql_to_sql_conditions
"startsWith": "LIKE", # Special handling in _graphql_to_sql_conditions
"endsWith": "LIKE", # Special handling in _graphql_to_sql_conditions
"in": "IN",
"notIn": "NOT IN",
}
conditions = []
def parse_node(node, parent_key=None, parent_dao=None):
if not isinstance(node, dict):
return
if isinstance(node, list):
conditions.append((parent_key, "IN", node))
return
for key, value in node.items():
if isinstance(key, property):
key = key.fget.__name__
external_fields_table_name_by_parent = self._get_external_field_key(parent_key)
external_fields_table_name = self._get_external_field_key(key)
external_field = (
external_fields_table_name
if external_fields_table_name_by_parent is None
else external_fields_table_name_by_parent
)
if key == "fuzzy":
self._handle_fuzzy_filter_conditions(conditions, external_table_deps, value)
elif parent_dao is not None and key in parent_dao.__db_names:
parse_node(value, f"{parent_dao.table_name}.{key}")
continue
elif external_field is not None:
external_table_deps.append(external_field)
parse_node(value, f"{external_field}.{key}")
elif parent_key in self.__foreign_table_keys:
if key in operators:
parse_node({key: value}, self.__foreign_table_keys[parent_key])
continue
if parent_key in self.__foreign_dao:
foreign_dao = self.__foreign_dao[parent_key]
if key in foreign_dao.__foreign_tables:
parse_node(
value,
f"{self.__foreign_tables[parent_key][0]}.{foreign_dao.__foreign_table_keys[key]}",
foreign_dao.__foreign_dao[key],
)
continue
if parent_key in self.__foreign_tables:
parse_node(value, f"{self.__foreign_tables[parent_key][0]}.{key}")
continue
parse_node({parent_key: value})
elif key in operators:
operator = operators[key]
if key == "contains" or key == "notContains":
value = f"%{value}%"
elif key == "in" or key == "notIn":
value = value
elif key == "startsWith":
value = f"{value}%"
elif key == "endsWith":
value = f"%{value}"
elif key == "isNull" or key == "isNotNull":
is_null_value = value.get("equal", None) if isinstance(value, dict) else value
if is_null_value is None:
operator = operators[key]
elif (key == "isNull" and is_null_value) or (key == "isNotNull" and not is_null_value):
operator = "IS NULL"
else:
operator = "IS NOT NULL"
conditions.append((parent_key, operator, None))
elif (key == "equal" or key == "notEqual") and value is None:
operator = operators["isNull"]
conditions.append((parent_key, operator, value))
elif isinstance(value, dict):
if key in self.__foreign_table_keys:
parse_node(value, key)
elif key in self.__db_names and parent_key is not None:
parse_node({f"{parent_key}": value})
elif key in self.__db_names:
parse_node(value, self.__db_names[key])
else:
parse_node(value, key)
elif value is None:
conditions.append((self.__db_names[key], "IS NULL", value))
else:
conditions.append((self.__db_names[key], "=", value))
parse_node(graphql_structure)
return conditions
def _handle_fuzzy_filter_conditions(self, conditions, external_field_table_deps, sub_values):
# Extract fuzzy filter parameters
fuzzy_fields = get_value(sub_values, "fields", list[str])
fuzzy_term = get_value(sub_values, "term", str)
fuzzy_threshold = get_value(sub_values, "threshold", int, 5)
if not fuzzy_fields or not fuzzy_term:
raise ValueError("Fuzzy filter must include 'fields' and 'term'.")
fuzzy_fields_db_names = []
# Map fields to their database names
for fuzzy_field in fuzzy_fields:
external_fields_table_name = self._get_external_field_key(fuzzy_field)
if external_fields_table_name is not None:
external_fields_table = self._external_fields[external_fields_table_name]
fuzzy_fields_db_names.append(f"{external_fields_table.table_name}.{fuzzy_field}")
external_field_table_deps.append(external_fields_table.table_name)
elif fuzzy_field in self.__db_names:
fuzzy_fields_db_names.append(f"{self._table_name}.{self.__db_names[fuzzy_field]}")
elif fuzzy_field in self.__foreign_tables:
fuzzy_fields_db_names.append(f"{self._table_name}.{self.__foreign_table_keys[fuzzy_field]}")
else:
fuzzy_fields_db_names.append(self.__db_names[String.to_snake_case(fuzzy_field)][0])
# Build fuzzy conditions for each field
fuzzy_conditions = self._build_fuzzy_conditions(fuzzy_fields_db_names, fuzzy_term, fuzzy_threshold)
# Combine conditions with OR and append to the main conditions
conditions.append((f"({' OR '.join(fuzzy_conditions)})", "fuzzy", None))
@staticmethod
def _build_fuzzy_conditions(fields: list[str], term: str, threshold: int = 10) -> list[str]:
conditions = []
for field in fields:
conditions.append(f"levenshtein({field}::TEXT, '{term}') <= {threshold}") # Adjust the threshold as needed
return conditions
def _get_external_field_key(self, field_name: str) -> Optional[str]:
"""
Returns the key to get the external field if found, otherwise None.
:param str field_name: The name of the field to search for.
:return: The key if found, otherwise None.
:rtype: Optional[str]
"""
if field_name is None:
return None
for key, builder in self._external_fields.items():
if field_name in builder.fields and field_name not in self.__db_names:
return key
return None
def _get_recursive_reference_join(self, attr: str) -> Optional[tuple[str, str]]:
parts = attr.split(".")
table_name = ".".join(parts[:-1])
if table_name == self._table_name or table_name == "":
return None
all_foreign_tables = {
x[0]: x[1]
for x in [
*[x for x in self.__foreign_tables.values() if x[0] != self._table_name],
*[x for fdao in self.__foreign_dao for x in self.__foreign_dao[fdao].__foreign_tables.values()],
]
}
if not table_name in all_foreign_tables:
return None
return table_name, all_foreign_tables[table_name]
def _build_sorts(
self,
builder: SQLSelectBuilder,
sorts: AttributeSorts,
external_table_deps: list[str],
):
"""
Resolves complex sorting structures into SQL-compatible sorting conditions.
Tracks external table dependencies.
:param builder: The SQLBuilder instance to add sorting to.
:param sorts: Sorting attributes and directions in a complex structure.
:param external_table_deps: List to track external table dependencies.
"""
def parse_sort_node(node, parent_key=None):
if isinstance(node, dict):
for key, value in node.items():
if isinstance(value, dict):
# Recursively parse nested structures
parse_sort_node(value, key)
elif isinstance(value, str) and value.lower() in ["asc", "desc"]:
external_table = self._get_external_field_key(key)
if external_table:
external_table_deps.append(external_table)
key = f"{external_table}.{key}"
if parent_key in self.__foreign_tables:
key = f"{self.__foreign_tables[parent_key][0]}.{key}"
builder.with_order_by(key, value.upper())
else:
raise ValueError(f"Invalid sort direction: {value}")
elif isinstance(node, list):
for item in node:
parse_sort_node(item)
else:
raise ValueError(f"Invalid sort structure: {node}")
parse_sort_node(sorts)
def _get_value_sql(self, value: Any) -> str:
if isinstance(value, str):
if value.lower() == "null":
return "NULL"
return f"'{value}'"
if isinstance(value, NoneType):
return "NULL"
if value is None:
return "NULL"
if isinstance(value, Enum):
return f"'{value.value}'"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, list):
if len(value) == 0:
return "()"
return f"({', '.join([self._get_value_sql(x) for x in value])})"
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
return f"'{value.strftime(DATETIME_FORMAT)}'"
return str(value)
@staticmethod
def _get_value_from_sql(cast_type: type, value: Any) -> Optional[T]:
"""
Get the value from the query result and cast it to the correct type
:param type cast_type:
:param Any value:
:return Optional[T]: Casted value, when value is str "NULL" None is returned
"""
if isinstance(value, str) and "NULL" in value:
return None
if isinstance(value, NoneType):
return None
if isinstance(value, cast_type):
return value
return cast_type(value)
def _get_primary_key_value_sql(self, obj: T_DBM) -> str:
value = getattr(obj, self.__primary_key)
if isinstance(value, str):
return f"'{value}'"
return value
@staticmethod
def _attr_from_date_to_char(attr: str) -> str:
return f"TO_CHAR({attr}, 'YYYY-MM-DD HH24:MI:SS.US TZ')"
@staticmethod
async def _get_editor_id(obj: T_DBM):
editor_id = obj.editor_id
# if editor_id is None:
# user = get_user()
# if user is not None:
# editor_id = user.id
return editor_id if editor_id is not None else "NULL"

View File

@@ -0,0 +1,53 @@
from abc import ABC, abstractmethod
from typing import Any
from cpl.database.model.database_settings import DatabaseSettings
class DBContextABC(ABC):
r"""ABC for the :class:`cpl.database.context.database_context.DatabaseContext`"""
@abstractmethod
def connect(self, database_settings: DatabaseSettings):
r"""Connects to a database by connection settings
Parameter:
database_settings :class:`cpl.database.database_settings.DatabaseSettings`
"""
@abstractmethod
async def execute(self, statement: str, args=None, multi=True) -> list[list]:
r"""Runs SQL Statements
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
multi: :class:`bool`
Returns:
list: Fetched list of executed elements
"""
@abstractmethod
async def select_map(self, statement: str, args=None) -> list[dict]:
r"""Runs SQL Select Statements and returns a list of dictionaries
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
Returns:
list: Fetched list of executed elements as dictionary
"""
@abstractmethod
async def select(self, statement: str, args=None) -> list[str] | list[tuple] | list[Any]:
r"""Runs SQL Select Statements and returns a list of dictionaries
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
Returns:
list: Fetched list of executed elements
"""

View File

@@ -0,0 +1,30 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import Id, SerialId
from cpl.database.abc.db_model_abc import DbModelABC
class DbJoinModelABC[T](DbModelABC[T]):
def __init__(
self,
id: Id,
source_id: Id,
foreign_id: Id,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._source_id = source_id
self._foreign_id = foreign_id
@property
def source_id(self) -> Id:
return self._source_id
@property
def foreign_id(self) -> Id:
return self._foreign_id

View File

@@ -0,0 +1,79 @@
from abc import ABC
from datetime import datetime, timezone
from typing import Optional, Generic
from cpl.core.typing import Id, SerialId, T
class DbModelABC(ABC, Generic[T]):
def __init__(
self,
id: Id,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
self._id = id
self._deleted = deleted
self._editor_id = editor_id
self._created = created if created is not None else datetime.now(timezone.utc).isoformat()
self._updated = updated if updated is not None else datetime.now(timezone.utc).isoformat()
@property
def id(self) -> Id:
return self._id
@property
def deleted(self) -> bool:
return self._deleted
@deleted.setter
def deleted(self, value: bool):
self._deleted = value
@property
def editor_id(self) -> SerialId:
return self._editor_id
@editor_id.setter
def editor_id(self, value: SerialId):
self._editor_id = value
# @async_property
# async def editor(self):
# if self._editor_id is None:
# return None
#
# from data.schemas.administration.user_dao import userDao
#
# return await userDao.get_by_id(self._editor_id)
@property
def created(self) -> datetime:
return self._created
@property
def updated(self) -> datetime:
return self._updated
@updated.setter
def updated(self, value: datetime):
self._updated = value
def to_dict(self) -> dict:
result = {}
for name, value in self.__dict__.items():
if not name.startswith("_") or name.endswith("_"):
continue
if isinstance(value, datetime):
value = value.isoformat()
if not isinstance(value, str):
value = str(value)
result[name.replace("_", "")] = value
return result

View File

@@ -0,0 +1,25 @@
from abc import abstractmethod
from datetime import datetime
from typing import Type
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.abc.db_model_abc import DbModelABC
from cpl.database.internal_tables import InternalTables
class DbModelDaoABC[T_DBM](DataAccessObjectABC[T_DBM]):
@abstractmethod
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
DataAccessObjectABC.__init__(self, source, model_type, table_name)
self.attribute(DbModelABC.id, int, ignore=True)
self.attribute(DbModelABC.deleted, bool)
self.attribute(DbModelABC.editor_id, int, ignore=True) # handled by db trigger
self.reference(
"editor", "id", DbModelABC.editor_id, InternalTables.users
) # not relevant for updates due to editor_id
self.attribute(DbModelABC.created, datetime, ignore=True) # handled by db trigger
self.attribute(DbModelABC.updated, datetime, ignore=True) # handled by db trigger

View File

@@ -0,0 +1 @@
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f %z"

View File

@@ -5,4 +5,4 @@ from cpl.core.typing import Source
class DBLogger(Logger):
def __init__(self, source: Source):
Logger.__init__(self, source, "db")
Logger.__init__(self, source, "db")

View File

@@ -0,0 +1,15 @@
from cpl.database.model.server_type import ServerTypes, ServerType
class InternalTables:
@classmethod
@property
def users(cls) -> str:
return "administration.users" if ServerType.server_type is ServerTypes.POSTGRES else "users"
@classmethod
@property
def executed_migrations(cls) -> str:
return "system._executed_migrations" if ServerType.server_type is ServerTypes.POSTGRES else "_executed_migrations"

View File

@@ -0,0 +1,3 @@
from .database_settings import DatabaseSettings
from .migration import Migration
from .server_type import ServerTypes

View File

@@ -1,6 +1,9 @@
from typing import Optional
from cpl.core.configuration import Configuration
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.environment import Environment
from cpl.core.utils import Base64
class DatabaseSettings(ConfigurationModelABC):
@@ -8,23 +11,23 @@ class DatabaseSettings(ConfigurationModelABC):
def __init__(
self,
host: str = None,
port: int = 3306,
user: str = None,
password: str = None,
database: str = None,
charset: str = "utf8mb4",
use_unicode: bool = False,
buffered: bool = False,
auth_plugin: str = "caching_sha2_password",
ssl_disabled: bool = False,
host: str = Environment.get("DB_HOST", str),
port: int = Environment.get("DB_PORT", str, Configuration.get("DB_DEFAULT_PORT", 0)),
user: str = Environment.get("DB_USER", str),
password: str = Environment.get("DB_PASSWORD", str),
database: str = Environment.get("DB_DATABASE", str),
charset: str = Environment.get("DB_CHARSET", str, "utf8mb4"),
use_unicode: bool = Environment.get("DB_USE_UNICODE", bool, False),
buffered: bool = Environment.get("DB_BUFFERED", bool, False),
auth_plugin: str = Environment.get("DB_AUTH_PLUGIN", str, "caching_sha2_password"),
ssl_disabled: bool = Environment.get("DB_SSL_DISABLED", bool, False),
):
ConfigurationModelABC.__init__(self)
self._host: Optional[str] = host
self._port: Optional[int] = port
self._user: Optional[str] = user
self._password: Optional[str] = password
self._password: Optional[str] = Base64.decode(password) if Base64.is_b64(password) else password
self._database: Optional[str] = database
self._charset: Optional[str] = charset
self._use_unicode: Optional[bool] = use_unicode

View File

@@ -0,0 +1,12 @@
class Migration:
def __init__(self, name: str, script: str):
self._name = name
self._script = script
@property
def name(self) -> str:
return self._name
@property
def script(self) -> str:
return self._script

View File

@@ -0,0 +1,21 @@
from enum import Enum
class ServerTypes(Enum):
POSTGRES = "postgres"
MYSQL = "mysql"
class ServerType:
_server_type: ServerTypes = None
@classmethod
def set_server_type(cls, server_type: ServerTypes):
assert server_type is not None, "server_type must not be None"
assert isinstance(server_type, ServerTypes), f"Expected ServerType but got {type(server_type)}"
cls._server_type = server_type
@classmethod
@property
def server_type(cls) -> ServerTypes:
assert cls._server_type is not None, "Server type is not set"
return cls._server_type

View File

@@ -4,16 +4,16 @@ import mysql.connector as sql
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.cursor import MySQLCursorBuffered
from cpl.core.database.connection.database_connection_abc import DatabaseConnectionABC
from cpl.core.database.database_settings import DatabaseSettings
from cpl.database.abc.connection_abc import ConnectionABC
from cpl.database.database_settings import DatabaseSettings
from cpl.core.utils.credential_manager import CredentialManager
class DatabaseConnection(DatabaseConnectionABC):
class DatabaseConnection(ConnectionABC):
r"""Representation of the database connection"""
def __init__(self):
DatabaseConnectionABC.__init__(self)
ConnectionABC.__init__(self)
self._database: Optional[MySQLConnectionAbstract] = None
self._cursor: Optional[MySQLCursorBuffered] = None

View File

@@ -0,0 +1,84 @@
import uuid
from typing import Any, List, Dict, Tuple, Union
from mysql.connector import Error as MySQLError, PoolError
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.db_logger import DBLogger
from cpl.database.model.database_settings import DatabaseSettings
from cpl.database.mysql.mysql_pool import MySQLPool
_logger = DBLogger(__name__)
class DBContext(DBContextABC):
def __init__(self):
DBContextABC.__init__(self)
self._pool: MySQLPool = None
self._fails = 0
self.connect(Configuration.get(DatabaseSettings))
def connect(self, database_settings: DatabaseSettings):
try:
_logger.debug("Connecting to database")
self._pool = MySQLPool(
database_settings,
)
_logger.info("Connected to database")
except Exception as e:
_logger.fatal("Connecting to database failed", e)
async def execute(self, statement: str, args=None, multi=True) -> List[List]:
_logger.trace(f"execute {statement} with args: {args}")
return await self._pool.execute(statement, args, multi)
async def select_map(self, statement: str, args=None) -> List[Dict]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select_map(statement, args)
except (MySQLError, PoolError) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select_map(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e
async def select(self, statement: str, args=None) -> Union[List[str], List[Tuple], List[Any]]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select(statement, args)
except (MySQLError, PoolError) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e

View File

@@ -0,0 +1,105 @@
from typing import Optional, Any
import sqlparse
import aiomysql
from cpl.core.environment import Environment
from cpl.database.db_logger import DBLogger
from cpl.database.model import DatabaseSettings
_logger = DBLogger(__name__)
class MySQLPool:
"""
Create a pool when connecting to MySQL, which will decrease the time spent in
requesting connection, creating connection, and closing connection.
"""
def __init__(self, database_settings: DatabaseSettings):
self._db_settings = database_settings
self.pool: Optional[aiomysql.Pool] = None
async def _get_pool(self):
if self.pool is None or self.pool._closed:
try:
self.pool = await aiomysql.create_pool(
host=self._db_settings.host,
port=self._db_settings.port,
user=self._db_settings.user,
password=self._db_settings.password,
db=self._db_settings.database,
minsize=1,
maxsize=Environment.get("DB_POOL_SIZE", int, 1),
autocommit=True,
)
except Exception as e:
_logger.fatal("Failed to connect to the database", e)
raise
return self.pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
if multi:
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
for q in queries:
if q.strip() == "":
continue
await cursor.execute(q, args)
else:
await cursor.execute(query, args)
async def execute(self, query: str, args=None, multi=True) -> list[list]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
if cursor.description is not None: # Query returns rows
res = await cursor.fetchall()
if res is None:
return []
return [list(row) for row in res]
else:
return []
async def select(self, query: str, args=None, multi=True) -> list[str]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor(aiomysql.DictCursor) as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)

View File

@@ -0,0 +1,86 @@
import uuid
from typing import Any
from psycopg import OperationalError
from psycopg_pool import PoolTimeout
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.database_settings import DatabaseSettings
from cpl.database.db_logger import DBLogger
from cpl.database.postgres.postgres_pool import PostgresPool
_logger = DBLogger(__name__)
class DBContext(DBContextABC):
def __init__(self):
DBContextABC.__init__(self)
self._pool: PostgresPool = None
self._fails = 0
self.connect(Configuration.get(DatabaseSettings))
def connect(self, database_settings: DatabaseSettings):
try:
_logger.debug("Connecting to database")
self._pool = PostgresPool(
database_settings,
Environment.get("DB_POOL_SIZE", int, 1),
)
_logger.info("Connected to database")
except Exception as e:
_logger.fatal("Connecting to database failed", e)
async def execute(self, statement: str, args=None, multi=True) -> list[list]:
_logger.trace(f"execute {statement} with args: {args}")
return await self._pool.execute(statement, args, multi)
async def select_map(self, statement: str, args=None) -> list[dict]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select_map(statement, args)
except (OperationalError, PoolTimeout) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select_map(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e
async def select(self, statement: str, args=None) -> list[str] | list[tuple] | list[Any]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select(statement, args)
except (OperationalError, PoolTimeout) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e

View File

@@ -0,0 +1,123 @@
from typing import Optional, Any
import sqlparse
from psycopg import sql
from psycopg_pool import AsyncConnectionPool, PoolTimeout
from cpl.core.environment import Environment
from cpl.database.db_logger import DBLogger
from cpl.database.model import DatabaseSettings
_logger = DBLogger(__name__)
class PostgresPool:
"""
Create a pool when connecting to PostgreSQL, which will decrease the time spent in
requesting connection, creating connection, and closing connection.
"""
def __init__(self, database_settings: DatabaseSettings):
self._conninfo = (
f"host={database_settings.host} "
f"port={database_settings.port} "
f"user={database_settings.user} "
f"password={database_settings.password} "
f"dbname={database_settings.database}"
)
self.pool: Optional[AsyncConnectionPool] = None
async def _get_pool(self):
pool = AsyncConnectionPool(
conninfo=self._conninfo, open=False, min_size=1, max_size=Environment.get("DB_POOL_SIZE", int, 1)
)
await pool.open()
try:
async with pool.connection() as con:
await pool.check_connection(con)
except PoolTimeout as e:
await pool.close()
_logger.fatal(f"Failed to connect to the database", e)
return pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
if multi:
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
for q in queries:
if q.strip() == "":
continue
await cursor.execute(sql.SQL(q), args)
else:
await cursor.execute(sql.SQL(query), args)
async def execute(self, query: str, args=None, multi=True) -> list[list]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
if cursor.description is not None: # Check if the query returns rows
res = await cursor.fetchall()
if res is None:
return []
result = []
for row in res:
result.append(list(row))
return result
else:
return []
async def select(self, query: str, args=None, multi=True) -> list[str]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
res_map: list[dict] = []
for i_res in range(len(res)):
cols = {}
for i_col in range(len(res[i_res])):
cols[cursor.description[i_col].name] = res[i_res][i_col]
res_map.append(cols)
return res_map

View File

@@ -0,0 +1,154 @@
from typing import Optional, Union
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
class SQLSelectBuilder:
def __init__(self, table_name: str, primary_key: str):
self._table_name = table_name
self._primary_key = primary_key
self._temp_tables: dict[str, ExternalDataTempTableBuilder] = {}
self._to_use_temp_tables: list[str] = []
self._attributes: list[str] = []
self._tables: list[str] = [table_name]
self._joins: dict[str, (str, str)] = {}
self._conditions: list[str] = []
self._order_by: str = ""
self._limit: Optional[int] = None
self._offset: Optional[int] = None
def with_temp_table(self, temp_table: ExternalDataTempTableBuilder) -> "SQLSelectBuilder":
self._temp_tables[temp_table.table_name] = temp_table
return self
def use_temp_table(self, temp_table_name: str):
if temp_table_name not in self._temp_tables:
raise ValueError(f"Temp table {temp_table_name} not found.")
self._to_use_temp_tables.append(temp_table_name)
def with_attribute(self, attr: str, ignore_table_name=False) -> "SQLSelectBuilder":
if not ignore_table_name and not attr.startswith(self._table_name):
attr = f"{self._table_name}.{attr}"
self._attributes.append(attr)
return self
def with_foreign_attribute(self, attr: str) -> "SQLSelectBuilder":
self._attributes.append(attr)
return self
def with_table(self, table_name: str) -> "SQLSelectBuilder":
self._tables.append(table_name)
return self
def _check_prefix(self, attr: str, foreign_tables: list[str]) -> str:
assert attr is not None
if "TO_CHAR" in attr:
return attr
valid_prefixes = [
"levenshtein",
self._table_name,
*self._joins.keys(),
*self._temp_tables.keys(),
*foreign_tables,
]
if not any(attr.startswith(f"{prefix}.") for prefix in valid_prefixes):
attr = f"{self._table_name}.{attr}"
return attr
def with_value_condition(
self, attr: str, operator: str, value: str, foreign_tables: list[str]
) -> "SQLSelectBuilder":
attr = self._check_prefix(attr, foreign_tables)
self._conditions.append(f"{attr} {operator} {value}")
return self
def with_levenshtein_condition(self, condition: str) -> "SQLSelectBuilder":
self._conditions.append(condition)
return self
def with_condition(self, attr: str, operator: str, foreign_tables: list[str]) -> "SQLSelectBuilder":
attr = self._check_prefix(attr, foreign_tables)
self._conditions.append(f"{attr} {operator}")
return self
def with_grouped_conditions(self, conditions: list[str]) -> "SQLSelectBuilder":
self._conditions.append(f"({' AND '.join(conditions)})")
return self
def with_left_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "LEFT")
self._joins[table] = (on, "LEFT")
return self
def with_inner_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "INNER")
self._joins[table] = (on, "INNER")
return self
def with_right_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "RIGHT")
self._joins[table] = (on, "RIGHT")
return self
def with_limit(self, limit: int) -> "SQLSelectBuilder":
self._limit = limit
return self
def with_offset(self, offset: int) -> "SQLSelectBuilder":
self._offset = offset
return self
def with_order_by(self, column: Union[str, property], direction: str = "ASC") -> "SQLSelectBuilder":
if isinstance(column, property):
column = column.fget.__name__
self._order_by = f"{column} {direction}"
return self
async def _handle_temp_table_use(self, query) -> str:
new_query = ""
for temp_table_name in self._to_use_temp_tables:
temp_table = self._temp_tables[temp_table_name]
new_query += await self._temp_tables[temp_table_name].build()
self.with_left_join(
temp_table.table_name,
f"{temp_table.join_ref_table}.{self._primary_key} = {temp_table.table_name}.{temp_table.primary_key}",
)
return f"{new_query} {query}" if new_query != "" else query
async def build(self) -> str:
query = await self._handle_temp_table_use("")
attributes = ", ".join(self._attributes) if self._attributes else "*"
query += f"SELECT {attributes} FROM {", ".join(self._tables)}"
for join in self._joins:
query += f" {self._joins[join][1]} JOIN {join} ON {self._joins[join][0]}"
if self._conditions:
query += " WHERE " + " AND ".join(self._conditions)
if self._order_by:
query += f" ORDER BY {self._order_by}"
if self._limit is not None:
query += f" LIMIT {self._limit}"
if self._offset is not None:
query += f" OFFSET {self._offset}"
return query

View File

@@ -0,0 +1,18 @@
from datetime import datetime
from typing import Optional
from cpl.database.abc import DbModelABC
class ExecutedMigration(DbModelABC):
def __init__(
self,
migration_id: str,
created: Optional[datetime] = None,
modified: Optional[datetime] = None,
):
DbModelABC.__init__(self, migration_id, False, created, modified)
@property
def migration_id(self) -> str:
return self._id

View File

@@ -0,0 +1,14 @@
from cpl.database import InternalTables
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.db_logger import DBLogger
from cpl.database.schema.executed_migration import ExecutedMigration
_logger = DBLogger(__name__)
class ExecutedMigrationDao(DataAccessObjectABC[ExecutedMigration]):
def __init__(self):
DataAccessObjectABC.__init__(self, __name__, ExecutedMigration, InternalTables.executed_migrations)
self.attribute(ExecutedMigration.migration_id, str, primary_key=True, db_name="migrationId")

View File

@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS _executed_migrations
(
migrationId VARCHAR(255) PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

View File

@@ -0,0 +1,26 @@
DELIMITER //
CREATE TRIGGER mytable_before_update
BEFORE UPDATE
ON mytable
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
SET NEW.updated = NOW();
END;
//
DELIMITER ;
DELIMITER //
CREATE TRIGGER mytable_before_delete
BEFORE DELETE
ON mytable
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
END;
//
DELIMITER ;

View File

@@ -0,0 +1,47 @@
CREATE SCHEMA IF NOT EXISTS public;
CREATE SCHEMA IF NOT EXISTS system;
CREATE TABLE IF NOT EXISTS system._executed_migrations
(
MigrationId VARCHAR(255) PRIMARY KEY,
Created timestamptz NOT NULL DEFAULT NOW(),
Updated timestamptz NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION public.history_trigger_function()
RETURNS TRIGGER AS
$$
DECLARE
schema_name TEXT;
history_table_name TEXT;
BEGIN
-- Construct the name of the history table based on the current table
schema_name := TG_TABLE_SCHEMA;
history_table_name := TG_TABLE_NAME || '_history';
IF (TG_OP = 'INSERT') THEN
RETURN NEW;
END IF;
-- Insert the old row into the history table on UPDATE or DELETE
IF (TG_OP = 'UPDATE' OR TG_OP = 'DELETE') THEN
EXECUTE format(
'INSERT INTO %I.%I SELECT ($1).*',
schema_name,
history_table_name
)
USING OLD;
END IF;
-- For UPDATE, update the Updated column and return the new row
IF (TG_OP = 'UPDATE') THEN
NEW.updated := NOW(); -- Update the Updated column
RETURN NEW;
END IF;
-- For DELETE, return OLD to allow the deletion
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;

View File

@@ -0,0 +1,111 @@
import glob
import os
from cpl.database.abc import DBContextABC
from cpl.database.db_logger import DBLogger
from cpl.database.model import Migration
from cpl.database.model.server_type import ServerType, ServerTypes
from cpl.database.schema.executed_migration import ExecutedMigration
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
_logger = DBLogger(__name__)
class MigrationService:
def __init__(self, db: DBContextABC, executedMigrationDao: ExecutedMigrationDao):
self._db = db
self._executedMigrationDao = executedMigrationDao
self._script_directories: list[str] = []
if ServerType.server_type == ServerTypes.POSTGRES:
self.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../scripts/postgres"))
elif ServerType.server_type == ServerTypes.MYSQL:
self.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../scripts/mysql"))
def with_directory(self, directory: str) -> "MigrationService":
self._script_directories.append(directory)
return self
async def _get_migration_history(self) -> list[ExecutedMigration]:
results = await self._db.select(f"SELECT * FROM {self._executedMigrationDao.table_name}")
applied_migrations = []
for result in results:
applied_migrations.append(ExecutedMigration(result[0]))
return applied_migrations
@staticmethod
def _load_scripts_by_path(path: str) -> list[Migration]:
migrations = []
if not os.path.exists(path):
raise Exception("Migration path not found")
files = sorted(glob.glob(f"{path}/*"))
for file in files:
if not file.endswith(".sql"):
continue
name = str(file.split(".sql")[0])
if "/" in name:
name = name.split("/")[-1]
with open(f"{file}", "r") as f:
script = f.read()
f.close()
migrations.append(Migration(name, script))
return migrations
def _load_scripts(self) -> list[Migration]:
migrations = []
for path in self._script_directories:
migrations.extend(self._load_scripts_by_path(path))
return migrations
async def _get_tables(self):
if ServerType == ServerTypes.POSTGRES:
return await self._db.select(
"""
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public';
"""
)
else:
return await self._db.select(
"""
SHOW TABLES;
"""
)
async def _execute(self, migrations: list[Migration]):
result = await self._get_tables()
for migration in migrations:
active_statement = ""
try:
# check if table exists
if len(result) > 0:
migration_from_db = await self._executedMigrationDao.find_by_id(migration.name)
if migration_from_db is not None:
continue
_logger.debug(f"Running upgrade migration: {migration.name}")
await self._db.execute(migration.script, multi=True)
await self._executedMigrationDao.create(ExecutedMigration(migration.name), skip_editor=True)
except Exception as e:
_logger.fatal(
f"Migration failed: {migration.name}\n{active_statement}",
e,
)
async def migrate(self):
await self._execute(self._load_scripts())

View File

@@ -0,0 +1,65 @@
from datetime import datetime
from typing import TypeVar, Union, Literal, Any
from cpl.database.abc.db_model_abc import DbModelABC
T_DBM = TypeVar("T_DBM", bound=DbModelABC)
NumberFilterOperator = Literal[
"equal",
"notEqual",
"greater",
"greaterOrEqual",
"less",
"lessOrEqual",
"isNull",
"isNotNull",
]
StringFilterOperator = Literal[
"equal",
"notEqual",
"contains",
"notContains",
"startsWith",
"endsWith",
"isNull",
"isNotNull",
]
BoolFilterOperator = Literal[
"equal",
"notEqual",
"isNull",
"isNotNull",
]
DateFilterOperator = Literal[
"equal",
"notEqual",
"greater",
"greaterOrEqual",
"less",
"lessOrEqual",
"isNull",
"isNotNull",
]
FilterOperator = Union[NumberFilterOperator, StringFilterOperator, BoolFilterOperator, DateFilterOperator]
Attribute = Union[str, property]
AttributeCondition = Union[
dict[NumberFilterOperator, int],
dict[StringFilterOperator, str],
dict[BoolFilterOperator, bool],
dict[DateFilterOperator, datetime],
]
AttributeFilter = dict[Attribute, Union[list[Union[AttributeCondition, Any]], AttributeCondition, Any]]
AttributeFilters = Union[
list[AttributeFilter],
AttributeFilter,
]
AttributeSort = dict[Attribute, Literal["asc", "desc"]]
AttributeSorts = Union[
list[AttributeSort],
AttributeSort,
]

View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-database"
version = "2024.7.0"
description = "CPL database"
readme ="CPL database package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "database", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,8 @@
cpl-core
cpl-dependency
psycopg[binary]==3.2.3
psycopg-pool==3.2.4
sqlparse==0.5.3
mysql-connector-python==9.4.0
async-property==0.2.2
aiomysql==0.2.0

View File

@@ -1,7 +1,6 @@
from .scope import Scope
from .scope_abc import ScopeABC
from .service_collection import ServiceCollection
from .service_collection_abc import ServiceCollectionABC
from .service_descriptor import ServiceDescriptor
from .service_lifetime_enum import ServiceLifetimeEnum
from .service_provider import ServiceProvider

Some files were not shown because too many files have changed in this diff Show More