Compare commits

..

2 Commits

Author SHA1 Message Date
b6cf5962aa Merge pull request 'Removed tools & docs' (#197) from dev_cleanup into master
Reviewed-on: #197
2025-10-08 21:30:47 +02:00
d3084041a9 Removed tools & docs 2025-10-08 21:30:01 +02:00
549 changed files with 13027 additions and 4130 deletions

View File

@@ -1,41 +0,0 @@
name: Build on push
run-name: Build on push
on:
push:
branches:
- dev
jobs:
prepare:
uses: ./.gitea/workflows/prepare.yaml
with:
version_suffix: 'dev'
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-core
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-translation
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-mail
secrets: inherit

View File

@@ -1,39 +0,0 @@
name: Build on push
run-name: Build on push
on:
push:
branches:
- master
jobs:
prepare:
uses: ./.gitea/workflows/prepare.yaml
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-core
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-translation
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-mail
secrets: inherit

View File

@@ -1,65 +0,0 @@
name: Build Package
run-name: Build Python Package
on:
workflow_call:
inputs:
version_suffix:
description: 'Suffix for version (z.B. "dev", "alpha", "beta")'
required: false
type: string
working_directory:
required: true
type: string
jobs:
build:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
defaults:
run:
working-directory: ${{ inputs.working_directory }}
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Download build version artifact
uses: actions/download-artifact@v3
with:
name: version
- name: Set version
run: |
sed -i -E "s/^version = \".*\"/version = \"$(cat /workspace/sh-edraft.de/cpl/version.txt)\"/" pyproject.toml
echo "Set version to $(cat /workspace/sh-edraft.de/cpl/version.txt)"
cat pyproject.toml
- name: Set pip conf
run: |
cat > .pip.conf <<'EOF'
[global]
extra-index-url = https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/
EOF
- name: Install Dependencies
run: |
export PIP_CONFIG_FILE=".pip.conf"
pip install build
- name: Build Package
run: |
python -m build --outdir dist
- name: Login to registry git.sh-edraft.de
uses: https://github.com/docker/login-action@v1
with:
registry: git.sh-edraft.de
username: ${{ secrets.CI_USERNAME }}
password: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Push image
run: |
pip install twine
python -m twine upload --repository-url https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi -u ${{ secrets.CI_USERNAME }} -p ${{ secrets.CI_ACCESS_TOKEN }} ./dist/*

View File

@@ -1,54 +0,0 @@
name: Prepare Build
run-name: Prepare Build Version
on:
workflow_call:
inputs:
version_suffix:
description: 'Suffix for version (z.B. "dev", "alpha", "beta")'
required: false
type: string
jobs:
prepare:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Get Date and Build Number
run: |
git fetch --tags
git tag
DATE=$(date +'%Y.%m.%d')
TAG_COUNT=$(git tag -l "${DATE}.*" | wc -l)
BUILD_NUMBER=$(($TAG_COUNT + 1))
VERSION_SUFFIX=${{ inputs.version_suffix }}
if [ -n "$VERSION_SUFFIX" ] && [ "$VERSION_SUFFIX" = "dev" ]; then
BUILD_VERSION="${DATE}.dev${BUILD_NUMBER}"
elif [ -n "$VERSION_SUFFIX" ]; then
BUILD_VERSION="${DATE}.${BUILD_NUMBER}${VERSION_SUFFIX}"
else
BUILD_VERSION="${DATE}.${BUILD_NUMBER}"
fi
echo "$BUILD_VERSION" > version.txt
echo "VERSION $BUILD_VERSION"
- name: Create Git Tag for Build
run: |
git config user.name "ci"
git config user.email "dev@sh-edraft.de"
echo "tag $(cat version.txt)"
git tag $(cat version.txt)
git push origin --tags
- name: Upload build version artifact
uses: actions/upload-artifact@v3
with:
name: version
path: version.txt

View File

@@ -1,2 +0,0 @@
[global]
extra-index-url = https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/

153
README.md
View File

@@ -0,0 +1,153 @@
<h1 align="center">CPL - Common python library</h1>
<!-- Summary -->
<p align="center">
<!-- <img src="" alt="cpl-logo" width="120px" height="120px"/> -->
<br>
<i>
CPL is a development platform for python server applications
<br>using Python.</i>
<br>
</p>
## Table of Contents
<!-- TABLE OF CONTENTS -->
<ol>
<li><a href="#Features">Features</a></li>
<li>
<a href="#getting-started">Getting Started</a>
<ul>
<li><a href="#prerequisites">Prerequisites</a></li>
<li><a href="#installation">Installation</a></li>
</ul>
</li>
<li><a href="#roadmap">Roadmap</a></li>
<li><a href="#contributing">Contributing</a></li>
<li><a href="#license">License</a></li>
<li><a href="#contact">Contact</a></li>
</ol>
## Features
<!-- FEATURE OVERVIEW -->
- Expandle
- Application base
- Standardized application classes
- Application object builder
- Application extension classes
- Startup classes
- Startup extension classes
- Configuration
- Configure via object mapped JSON
- Console argument handling
- Console class for in and output
- Banner
- Spinner
- Options (menu)
- Table
- Write
- Write_at
- Write_line
- Write_line_at
- Dependency injection
- Service lifetimes: singleton, scoped and transient
- Providing of application environment
- Environment (development, staging, testing, production)
- Appname
- Customer
- Hostname
- Runtime directory
- Working directory
- Logging
- Standardized logger
- Log-level (FATAL, ERROR, WARN, INFO, DEBUG & TRACE)
- Mail handling
- Send mails
- Pipe classes
- Convert input
- Utils
- Credential manager
- Encryption via BASE64
- PIP wrapper class based on subprocess
- Run pip commands
- String converter to different variants
- to_lower_case
- to_camel_case
- ...
<!-- GETTING STARTED -->
## Getting Started
[Get started with CPL][quickstart].
### Prerequisites
- Install [python] which includes [Pip installs packages][pip]
### Installation
Install the CPL package
```sh
pip install cpl-core --extra-index-url https://pip.sh-edraft.de
```
Install the CPL CLI
```sh
pip install cpl-cli --extra-index-url https://pip.sh-edraft.de
```
Create workspace:
```sh
cpl new <console|library|unittest> <PROJECT NAME>
```
Run the application:
```sh
cd <PROJECT NAME>
cpl start
```
<!-- ROADMAP -->
## Roadmap
See the [open issues](https://git.sh-edraft.de/sh-edraft.de/sh_cpl/issues) for a list of proposed features (and known issues).
<!-- CONTRIBUTING -->
## Contributing
### Contributing Guidelines
Read through our [contributing guidelines][contributing] to learn about our submission process, coding rules and more.
### Want to Help?
Want to file a bug, contribute some code, or improve documentation? Excellent! Read up on our guidelines for [contributing][contributing].
<!-- LICENSE -->
## License
Distributed under the MIT License. See [LICENSE] for more information.
<!-- CONTACT -->
## Contact
Sven Heidemann - sven.heidemann@sh-edraft.de
Project link: [https://git.sh-edraft.de/sh-edraft.de/sh_common_py_lib](https://git.sh-edraft.de/sh-edraft.de/sh_cpl)
<!-- External LINKS -->
[pip_url]: https://pip.sh-edraft.de
[python]: https://www.python.org/
[pip]: https://pypi.org/project/pip/
<!-- Internal LINKS -->
[project]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl
[quickstart]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/quickstart
[contributing]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/contributing
[license]: LICENSE

151
cpl-workspace.json Normal file
View File

@@ -0,0 +1,151 @@
{
"WorkspaceSettings": {
"DefaultProject": "cpl-core",
"Projects": {
"cpl-cli": "src/cpl_cli/cpl-cli.json",
"cpl-core": "src/cpl_core/cpl-core.json",
"cpl-discord": "src/cpl_discord/cpl-discord.json",
"cpl-query": "src/cpl_query/cpl-query.json",
"cpl-translation": "src/cpl_translation/cpl-translation.json",
"set-version": "tools/set_version/set-version.json",
"set-pip-urls": "tools/set_pip_urls/set-pip-urls.json",
"unittests": "unittests/unittests/unittests.json",
"unittests_cli": "unittests/unittests_cli/unittests_cli.json",
"unittests_core": "unittests/unittests_core/unittests_core.json",
"unittests_query": "unittests/unittests_query/unittests_query.json",
"unittests_shared": "unittests/unittests_shared/unittests_shared.json",
"unittests_translation": "unittests/unittests_translation/unittests_translation.json"
},
"Scripts": {
"hello-world": "echo 'Hello World'",
"format": "echo 'Formatting:'; black ./",
"sv": "cpl set-version",
"set-version": "cpl run set-version --dev $ARGS; echo '';",
"spu": "cpl set-pip-urls",
"set-pip-urls": "cpl run set-pip-urls --dev $ARGS; echo '';",
"docs-build": "cpl format; echo 'Build Documentation'; cpl db-core; cpl db-discord; cpl db-query; cpl db-translation; cd docs/; make clean; make html;",
"db-core": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_core; cd ../",
"db-discord": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_discord; cd ../",
"db-query": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_query; cd ../",
"db-translation": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_translation; cd ../",
"db": "cpl docs-build",
"docs-open": "xdg-open $PWD/docs/build/html/index.html &",
"do": "cpl docs-open",
"test": "cpl run unittests",
"pre-build-all": "cpl sv $ARGS; cpl spu $ARGS;",
"build-all": "cpl build-cli; cpl build-core; cpl build-discord; cpl build-query; cpl build-translation; cpl build-set-pip-urls; cpl build-set-version",
"ba": "cpl build-all $ARGS",
"build-cli": "echo 'Build cpl-cli'; cd ./src/cpl_cli; cpl build; cd ../../;",
"build-core": "echo 'Build cpl-core'; cd ./src/cpl_core; cpl build; cd ../../;",
"build-discord": "echo 'Build cpl-discord'; cd ./src/cpl_discord; cpl build; cd ../../;",
"build-query": "echo 'Build cpl-query'; cd ./src/cpl_query; cpl build; cd ../../;",
"build-translation": "echo 'Build cpl-translation'; cd ./src/cpl_translation; cpl build; cd ../../;",
"build-set-pip-urls": "echo 'Build set-pip-urls'; cd ./tools/set_pip_urls; cpl build; cd ../../;",
"build-set-version": "echo 'Build set-version'; cd ./tools/set_version; cpl build; cd ../../;",
"pre-publish-all": "cpl sv $ARGS; cpl spu $ARGS;",
"publish-all": "cpl publish-cli; cpl publish-core; cpl publish-discord; cpl publish-query; cpl publish-translation;",
"pa": "cpl publish-all $ARGS",
"publish-cli": "echo 'Publish cpl-cli'; cd ./src/cpl_cli; cpl publish; cd ../../;",
"publish-core": "echo 'Publish cpl-core'; cd ./src/cpl_core; cpl publish; cd ../../;",
"publish-discord": "echo 'Publish cpl-discord'; cd ./src/cpl_discord; cpl publish; cd ../../;",
"publish-query": "echo 'Publish cpl-query'; cd ./src/cpl_query; cpl publish; cd ../../;",
"publish-translation": "echo 'Publish cpl-translation'; cd ./src/cpl_translation; cpl publish; cd ../../;",
"upload-prod-cli": "echo 'PROD Upload cpl-cli'; cpl upl-prod-cli;",
"upl-prod-cli": "twine upload -r pip.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-prod-core": "echo 'PROD Upload cpl-core'; cpl upl-prod-core;",
"upl-prod-core": "twine upload -r pip.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-prod-discord": "echo 'PROD Upload cpl-discord'; cpl upl-prod-discord;",
"upl-prod-discord": "twine upload -r pip.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-prod-query": "echo 'PROD Upload cpl-query'; cpl upl-prod-query;",
"upl-prod-query": "twine upload -r pip.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-prod-translation": "echo 'PROD Upload cpl-translation'; cpl upl-prod-translation;",
"upl-prod-translation": "twine upload -r pip.sh-edraft.de dist/cpl-translation/publish/setup/*",
"upload-exp-cli": "echo 'EXP Upload cpl-cli'; cpl upl-exp-cli;",
"upl-exp-cli": "twine upload -r pip-exp.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-exp-core": "echo 'EXP Upload cpl-core'; cpl upl-exp-core;",
"upl-exp-core": "twine upload -r pip-exp.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-exp-discord": "echo 'EXP Upload cpl-discord'; cpl upl-exp-discord;",
"upl-exp-discord": "twine upload -r pip-exp.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-exp-query": "echo 'EXP Upload cpl-query'; cpl upl-exp-query;",
"upl-exp-query": "twine upload -r pip-exp.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-exp-translation": "echo 'EXP Upload cpl-translation'; cpl upl-exp-translation;",
"upl-exp-translation": "twine upload -r pip-exp.sh-edraft.de dist/cpl-translation/publish/setup/*",
"upload-dev-cli": "echo 'DEV Upload cpl-cli'; cpl upl-dev-cli;",
"upl-dev-cli": "twine upload -r pip-dev.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-dev-core": "echo 'DEV Upload cpl-core'; cpl upl-dev-core;",
"upl-dev-core": "twine upload -r pip-dev.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-dev-discord": "echo 'DEV Upload cpl-discord'; cpl upl-dev-discord;",
"upl-dev-discord": "twine upload -r pip-dev.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-dev-query": "echo 'DEV Upload cpl-query'; cpl upl-dev-query;",
"upl-dev-query": "twine upload -r pip-dev.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-dev-translation": "echo 'DEV Upload cpl-translation'; cpl upl-dev-translation;",
"upl-dev-translation": "twine upload -r pip-dev.sh-edraft.de dist/cpl-translation/publish/setup/*",
"pre-deploy-prod": "cpl sv $ARGS; cpl spu --environment=production;",
"deploy-prod": "cpl deploy-prod-cli; cpl deploy-prod-core; cpl deploy-prod-discord; cpl deploy-prod-query; cpl deploy-prod-translation;",
"dp": "cpl deploy-prod $ARGS",
"deploy-prod-cli": "cpl publish-cli; cpl upload-prod-cli",
"deploy-prod-core": "cpl publish-core; cpl upload-prod-core",
"deploy-prod-query": "cpl publish-query; cpl upload-prod-query",
"deploy-prod-discord": "cpl publish-discord; cpl upload-prod-discord",
"deploy-prod-translation": "cpl publish-translation; cpl upload-prod-translation",
"pre-deploy-exp": "cpl sv $ARGS; cpl spu --environment=staging;",
"deploy-exp": "cpl deploy-exp-cli; cpl deploy-exp-core; cpl deploy-exp-discord; cpl deploy-exp-query; cpl deploy-exp-translation;",
"de": "cpl deploy-exp $ARGS",
"deploy-exp-cli": "cpl publish-cli; cpl upload-exp-cli",
"deploy-exp-core": "cpl publish-core; cpl upload-exp-core",
"deploy-exp-discord": "cpl publish-discord; cpl upload-exp-discord",
"deploy-exp-query": "cpl publish-query; cpl upload-exp-query",
"deploy-exp-translation": "cpl publish-translation; cpl upload-exp-translation",
"pre-deploy-dev": "cpl sv $ARGS; cpl spu --environment=development;",
"deploy-dev": "cpl deploy-dev-cli; cpl deploy-dev-core; cpl deploy-dev-discord; cpl deploy-dev-query; cpl deploy-dev-translation;",
"dd": "cpl deploy-dev $ARGS",
"deploy-dev-cli": "cpl publish-cli; cpl upload-dev-cli",
"deploy-dev-core": "cpl publish-core; cpl upload-dev-core",
"deploy-dev-discord": "cpl publish-discord; cpl upload-dev-discord",
"deploy-dev-query": "cpl publish-query; cpl upload-dev-query",
"deploy-dev-translation": "cpl publish-query; cpl upload-dev-translation",
"dev-install": "cpl di-core; cpl di-cli; cpl di-query; cpl di-translation;",
"di": "cpl dev-install",
"di-core": "pip install cpl-core --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-cli": "pip install cpl-cli --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-discord": "pip install cpl-discord --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-query": "pip install cpl-query --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-translation": "pip install cpl-translation --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"prod-install": "cpl pi-core; cpl pi-cli; cpl pi-query; cpl pi-translation;",
"pi": "cpl prod-install",
"pi-core": "pip install cpl-core --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-cli": "pip install cpl-cli --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-discord": "pip install cpl-discord --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-query": "pip install cpl-query --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-translation": "pip install cpl-translation --pre --upgrade --extra-index-url https://pip.sh-edraft.de"
}
}
}

View File

@@ -1,6 +0,0 @@
from .application_abc import ApplicationABC
from .application_builder import ApplicationBuilder
from .application_builder_abc import ApplicationBuilderABC
from .application_extension_abc import ApplicationExtensionABC
from .startup_abc import StartupABC
from .startup_extension_abc import StartupExtensionABC

View File

@@ -1,97 +0,0 @@
from typing import Type, Optional, Callable, Union
from cpl.application.application_abc import ApplicationABC
from cpl.application.application_builder_abc import ApplicationBuilderABC
from cpl.application.application_extension_abc import ApplicationExtensionABC
from cpl.application.async_application_extension_abc import AsyncApplicationExtensionABC
from cpl.application.async_startup_abc import AsyncStartupABC
from cpl.application.async_startup_extension_abc import AsyncStartupExtensionABC
from cpl.application.startup_abc import StartupABC
from cpl.application.startup_extension_abc import StartupExtensionABC
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class ApplicationBuilder(ApplicationBuilderABC):
r"""This is class is used to build an object of :class:`cpl.application.application_abc.ApplicationABC`
Parameter:
app: Type[:class:`cpl.application.application_abc.ApplicationABC`]
Application to build
"""
def __init__(self, app: Type[ApplicationABC]):
ApplicationBuilderABC.__init__(self)
self._app = app
self._startup: Optional[StartupABC | AsyncStartupABC] = None
self._services = ServiceCollection()
self._app_extensions: list[Type[ApplicationExtensionABC | AsyncApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC | AsyncStartupABC]] = []
def use_startup(self, startup: Type[StartupABC | AsyncStartupABC]) -> "ApplicationBuilder":
self._startup = startup()
return self
def use_extension(
self,
extension: Type[
ApplicationExtensionABC | AsyncApplicationExtensionABC | StartupExtensionABC | AsyncStartupExtensionABC
],
) -> "ApplicationBuilder":
if (
issubclass(extension, ApplicationExtensionABC) or issubclass(extension, AsyncApplicationExtensionABC)
) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (
issubclass(extension, StartupExtensionABC) or issubclass(extension, AsyncStartupExtensionABC)
) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def _build_startup(self):
for ex in self._startup_extensions:
extension = ex()
extension.configure_configuration(Configuration, Environment)
extension.configure_services(self._services, Environment)
if self._startup is not None:
self._startup.configure_configuration(Configuration, Environment)
self._startup.configure_services(self._services, Environment)
async def _build_async_startup(self):
for ex in self._startup_extensions:
extension = ex()
await extension.configure_configuration(Configuration, Environment)
await extension.configure_services(self._services, Environment)
if self._startup is not None:
await self._startup.configure_configuration(Configuration, Environment)
await self._startup.configure_services(self._services, Environment)
def build(self) -> ApplicationABC:
self._build_startup()
config = Configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
extension.run(config, services)
return self._app(services)
async def build_async(self) -> ApplicationABC:
await self._build_async_startup()
config = Configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
await extension.run(config, services)
return self._app(services)

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -1,14 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class AsyncApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -1,23 +0,0 @@
from abc import ABC, abstractmethod
from cpl.dependency.service_collection import ServiceCollection
class AsyncStartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self):
r"""Creates configuration of application"""
@abstractmethod
async def configure_services(self, service: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -1,31 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class AsyncStartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.Configuration`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
async def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,31 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class StartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,33 +0,0 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -1,30 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-application"
version = "2024.7.0"
description = "CPL application"
readme ="CPL application package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "application", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1,2 +0,0 @@
cpl-core
cpl-dependency

View File

@@ -1 +0,0 @@

View File

@@ -1,2 +0,0 @@
from .configuration import Configuration
from .configuration_model_abc import ConfigurationModelABC

View File

@@ -1,134 +0,0 @@
import inspect
import json
import os
import sys
from typing import Any
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.console.console import Console
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.environment.environment import Environment
from cpl.core.typing import D, T
from cpl.core.utils.json_processor import JSONProcessor
class Configuration:
_config = {}
@staticmethod
def _print_info(message: str):
r"""Prints an info message
Parameter:
name: :class:`str`
Info name
message: :class:`str`
Info message
"""
Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
def _print_warn(message: str):
r"""Prints a warning
Parameter:
name: :class:`str`
Warning name
message: :class:`str`
Warning message
"""
Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
def _print_error(message: str):
r"""Prints an error
Parameter:
name: :class:`str`
Error name
message: :class:`str`
Error message
"""
Console.set_foreground_color(ForegroundColorEnum.red)
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@classmethod
def _load_json_file(cls, file: str, output: bool) -> dict:
r"""Reads the json file
Parameter:
file: :class:`str`
Name of the file
output: :class:`bool`
Specifies whether an output should take place
Returns:
Object of :class:`dict`
"""
try:
# open config file, create if not exists
with open(file, encoding="utf-8") as cfg:
# load json
json_cfg = json.load(cfg)
if output:
cls._print_info(f"Loaded config file: {file}")
return json_cfg
except Exception as e:
cls._print_error(f"Cannot load config file: {file}! -> {e}")
return {}
@classmethod
def add_json_file(cls, name: str, optional: bool = None, output: bool = True, path: str = None):
if os.path.isabs(name):
file_path = name
else:
path_root = Environment.get_cwd()
if path is not None:
path_root = path
if str(path_root).endswith("/") and not name.startswith("/"):
file_path = f"{path_root}{name}"
else:
file_path = f"{path_root}/{name}"
if not os.path.isfile(file_path):
if optional is not True:
if output:
cls._print_error(f"File not found: {file_path}")
sys.exit()
if output:
cls._print_warn(f"Not Loaded config file: {file_path}")
return None
config_from_file = cls._load_json_file(file_path, output)
for sub in ConfigurationModelABC.__subclasses__():
for key, value in config_from_file.items():
if sub.__name__ != key and sub.__name__.replace("Settings", "") != key:
continue
configuration = JSONProcessor.process(sub, value)
cls.set(sub, configuration)
@classmethod
def set(cls, key: Any, value: T):
if inspect.isclass(key):
key = key.__name__
cls._config[key] = value
@classmethod
def get(cls, key: Any, default: D = None) -> T | D:
if inspect.isclass(key):
key = key.__name__
return cls._config.get(key, default)

View File

@@ -1,5 +0,0 @@
from abc import ABC
class ConfigurationModelABC(ABC):
pass

View File

@@ -1,4 +0,0 @@
from .background_color_enum import BackgroundColorEnum
from .console import Console
from ._call import ConsoleCall
from .foreground_color_enum import ForegroundColorEnum

View File

@@ -1,2 +0,0 @@
from .environment_enum import EnvironmentEnum
from .environment import Environment

View File

@@ -1,68 +0,0 @@
import os
from socket import gethostname
from typing import Optional, Type
from cpl.core.environment.environment_enum import EnvironmentEnum
from cpl.core.typing import T
from cpl.core.utils.get_value import get_value
class Environment:
r"""Represents environment of the application
Parameter:
name: :class:`cpl.core.environment.environment_name_enum.EnvironmentNameEnum`
"""
@classmethod
def get_environment(cls):
return cls.get("ENVIRONMENT", str, EnvironmentEnum.production.value)
@classmethod
def set_environment(cls, environment: str):
assert environment is not None and environment != "", "environment must not be None or empty"
assert environment.lower() in [
e.value for e in EnvironmentEnum
], f"environment must be one of {[e.value for e in EnvironmentEnum]}"
cls.set("ENVIRONMENT", environment.lower())
@classmethod
def get_app_name(cls) -> str:
return cls.get("APP_NAME", str)
@classmethod
def set_app_name(cls, app_name: str):
cls.set("APP_NAME", app_name)
@staticmethod
def get_host_name() -> str:
return gethostname()
@staticmethod
def get_cwd() -> str:
return os.getcwd()
@staticmethod
def set_cwd(working_directory: str):
assert working_directory is not None and working_directory != "", "working_directory must not be None or empty"
os.chdir(working_directory)
@staticmethod
def set(key: str, value: T):
assert key is not None and key != "", "key must not be None or empty"
os.environ[key] = str(value)
@staticmethod
def get(key: str, cast_type: Type[T], default: Optional[T] = None) -> Optional[T]:
"""
Get an environment variable and cast it to a specified type.
:param str key: The name of the environment variable.
:param Type[T] cast_type: A callable to cast the variable's value.
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
:return: The casted value, or None if the variable is not found.
:rtype: Optional[T]
"""
return get_value(dict(os.environ), key, cast_type, default)

View File

@@ -1,4 +0,0 @@
from .logger import Logger
from .logger_abc import LoggerABC
from .log_level_enum import LogLevelEnum
from .logging_settings import LogSettings

View File

@@ -1,11 +0,0 @@
from enum import Enum
class LogLevelEnum(Enum):
off = "OFF" # Nothing
trace = "TRC" # Detailed app information's
debug = "DEB" # Detailed app state
info = "INF" # Normal information's
warning = "WAR" # Error that can later be fatal
error = "ERR" # Non fatal error
fatal = "FAT" # Error that cause exit

View File

@@ -1,117 +0,0 @@
import os
import traceback
from datetime import datetime
from cpl.core.console import Console
from cpl.core.log.log_level_enum import LogLevelEnum
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.typing import Messages, Source
class Logger(LoggerABC):
_level = LogLevelEnum.info
_levels = [x for x in LogLevelEnum]
# ANSI color codes for different log levels
_COLORS = {
LogLevelEnum.trace: "\033[37m", # Light Gray
LogLevelEnum.debug: "\033[94m", # Blue
LogLevelEnum.info: "\033[92m", # Green
LogLevelEnum.warning: "\033[93m", # Yellow
LogLevelEnum.error: "\033[91m", # Red
LogLevelEnum.fatal: "\033[95m", # Magenta
}
def __init__(self, source: Source, file_prefix: str = None):
LoggerABC.__init__(self)
assert source is not None and source != "", "Source cannot be None or empty"
self._source = source
if file_prefix is None:
file_prefix = "app"
self._file_prefix = file_prefix
self._create_log_dir()
@property
def log_file(self):
return f"logs/{self._file_prefix}_{datetime.now().strftime('%Y-%m-%d')}.log"
@staticmethod
def _create_log_dir():
if os.path.exists("logs"):
return
os.makedirs("logs")
@classmethod
def set_level(cls, level: LogLevelEnum):
if level in cls._levels:
cls._level = level
else:
raise ValueError(f"Invalid log level: {level}")
@staticmethod
def _ensure_file_size(log_file: str):
if not os.path.exists(log_file) or os.path.getsize(log_file) <= 0.5 * 1024 * 1024:
return
# if exists and size is greater than 300MB, create a new file
os.rename(
log_file,
f"{log_file.split('.log')[0]}_{datetime.now().strftime('%H-%M-%S')}.log",
)
def _write_log_to_file(self, content: str):
file = self.log_file
self._ensure_file_size(file)
with open(file, "a") as log_file:
log_file.write(content + "\n")
log_file.close()
def _log(self, level: LogLevelEnum, *messages: Messages):
try:
if self._levels.index(level) < self._levels.index(self._level):
return
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
formatted_message = self._format_message(level.value, timestamp, *messages)
self._write_log_to_file(formatted_message)
Console.write_line(f"{self._COLORS.get(self._level, '\033[0m')}{formatted_message}\033[0m")
except Exception as e:
print(f"Error while logging: {e} -> {traceback.format_exc()}")
def _format_message(self, level: str, timestamp, *messages: Messages) -> str:
if isinstance(messages, tuple):
messages = list(messages)
if not isinstance(messages, list):
messages = [messages]
messages = [str(message) for message in messages if message is not None]
return f"<{timestamp}> [{level.upper():^3}] [{self._file_prefix}] - [{self._source}]: {' '.join(messages)}"
def header(self, string: str):
self._log(LogLevelEnum.info, string)
def trace(self, *messages: Messages):
self._log(LogLevelEnum.trace, *messages)
def debug(self, *messages: Messages):
self._log(LogLevelEnum.debug, *messages)
def info(self, *messages: Messages):
self._log(LogLevelEnum.info, *messages)
def warning(self, *messages: Messages):
self._log(LogLevelEnum.warning, *messages)
def error(self, message, e: Exception = None):
self._log(LogLevelEnum.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
def fatal(self, message, e: Exception = None, prevent_quit: bool = False):
self._log(LogLevelEnum.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
if not prevent_quit:
exit(-1)

View File

@@ -1,3 +0,0 @@
from .bool_pipe import BoolPipe
from .ip_address_pipe import IPAddressPipe
from .pipe_abc import PipeABC

View File

@@ -1,13 +0,0 @@
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T
class BoolPipe[bool](PipeABC):
@staticmethod
def to_str(value: T, *args):
return str(value).lower()
@staticmethod
def from_str(value: str, *args) -> T:
return value in ("True", "true", "1", "yes", "y", "Y")

View File

@@ -1,38 +0,0 @@
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T
class IPAddressPipe[list](PipeABC):
@staticmethod
def to_str(value: T, *args) -> str:
string = ""
if len(value) != 4:
raise ValueError("Invalid IP")
for i in range(0, len(value)):
byte = value[i]
if not 0 <= byte <= 255:
raise ValueError("Invalid IP")
if i == len(value) - 1:
string += f"{byte}"
else:
string += f"{byte}."
return string
@staticmethod
def from_str(value: str, *args) -> T:
parts = value.split(".")
if len(parts) != 4:
raise Exception("Invalid IP")
result = []
for part in parts:
byte = int(part)
if not 0 <= byte <= 255:
raise Exception("Invalid IP")
result.append(byte)
return result

View File

@@ -1,16 +0,0 @@
from abc import ABC, abstractmethod
from typing import Generic
from cpl.core.typing import T
class PipeABC(ABC, Generic[T]):
@staticmethod
@abstractmethod
def to_str(value: T, *args) -> str:
pass
@staticmethod
@abstractmethod
def from_str(value: str, *args) -> T:
pass

View File

@@ -1,2 +0,0 @@
from .time_format_settings import TimeFormatSettings
from .time_format_settings_names_enum import TimeFormatSettingsNamesEnum

View File

@@ -1,16 +0,0 @@
from typing import TypeVar, Any
from uuid import UUID
T = TypeVar("T")
D = TypeVar("D")
R = TypeVar("R")
Service = TypeVar("Service")
Source = TypeVar("Source")
Messages = list[Any] | Any
UuidId = str | UUID
SerialId = int
Id = UuidId | SerialId

View File

@@ -1,5 +0,0 @@
from .base64 import Base64
from .credential_manager import CredentialManager
from .json_processor import JSONProcessor
from .pip import Pip
from .string import String

View File

@@ -1,43 +0,0 @@
import base64
from typing import Union
class Base64:
@staticmethod
def encode(string: str) -> str:
"""
Encode a string with base64
:param string:
:return:
"""
return base64.b64encode(string.encode("utf-8")).decode("utf-8")
@staticmethod
def decode(string: str) -> str:
"""
Decode a string with base64
:param string:
:return:
"""
return base64.b64decode(string).decode("utf-8")
@staticmethod
def is_b64(sb: Union[str, bytes]) -> bool:
"""
Check if a string is base64 encoded
:param Union[str, bytes] sb:
:return:
:rtype: bool
"""
try:
if isinstance(sb, str):
# If there's any unicode here, an exception will be thrown and the function will return false
sb_bytes = bytes(sb, "ascii")
elif isinstance(sb, bytes):
sb_bytes = sb
else:
raise ValueError("Argument must be string or bytes")
return base64.b64encode(base64.b64decode(sb_bytes)) == sb_bytes
except ValueError:
return False

View File

@@ -1,56 +0,0 @@
from typing import Type, Optional
from cpl.core.typing import T
def get_value(
source: dict,
key: str,
cast_type: Type[T],
default: Optional[T] = None,
list_delimiter: str = ",",
) -> Optional[T]:
"""
Get value from source dictionary and cast it to a specified type.
:param dict source: The source dictionary.
:param str key: The name of the environment variable.
:param Type[T] cast_type: A callable to cast the variable's value.
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.
:param str list_delimiter: The delimiter to split the value into a list. Defaults to ",".
:return: The casted value, or None if the key is not found.
:rtype: Optional[T]
"""
if key not in source:
return default
value = source[key]
if isinstance(
value,
cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__,
):
# Handle list[int] case explicitly
if hasattr(cast_type, "__origin__") and cast_type.__origin__ == list:
subtype = cast_type.__args__[0] if hasattr(cast_type, "__args__") else None
if subtype is not None:
return [subtype(item) for item in value]
return value
try:
if cast_type == bool:
return value.lower() in ["true", "1"]
if (cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__) == list:
if not (value.startswith("[") and value.endswith("]")) and list_delimiter not in value:
raise ValueError("List values must be enclosed in square brackets or use a delimiter.")
if value.startswith("[") and value.endswith("]"):
value = value[1:-1]
value = value.split(list_delimiter)
subtype = cast_type.__args__[0] if hasattr(cast_type, "__args__") else None
return [subtype(item) if subtype is not None else item for item in value]
return cast_type(value)
except (ValueError, TypeError):
return default

View File

@@ -1,29 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-core"
version = "2024.7.0"
description = "CPL core"
readme = "CPL core package"
requires-python = ">=3.12"
license = "MIT"
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "core", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1,6 +0,0 @@
art==6.5
colorama==0.4.6
tabulate==0.9.0
termcolor==3.1.0
mysql-connector-python==9.4.0
pynput==1.8.1

View File

@@ -1,41 +0,0 @@
from typing import Type
from cpl.dependency import ServiceCollection as _ServiceCollection
from . import mysql as _mysql
from . import postgres as _postgres
from .internal_tables import InternalTables
def _add(collection: _ServiceCollection,db_context: Type, default_port: int, server_type: str):
from cpl.core.console import Console
from cpl.core.configuration import Configuration
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.model.server_type import ServerTypes, ServerType
from cpl.database.model.database_settings import DatabaseSettings
from cpl.database.service.migration_service import MigrationService
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
try:
ServerType.set_server_type(ServerTypes(server_type))
Configuration.set("DB_DEFAULT_PORT", default_port)
collection.add_singleton(DBContextABC, db_context)
collection.add_singleton(ExecutedMigrationDao)
collection.add_singleton(MigrationService)
except ImportError as e:
Console.error("cpl-database is not installed", str(e))
def add_mysql(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 3306, ServerTypes.MYSQL.value)
def add_postgres(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 5432, ServerTypes.POSTGRES.value)
_ServiceCollection.with_module(add_mysql, _mysql.__name__)
_ServiceCollection.with_module(add_postgres, _postgres.__name__)

View File

@@ -1,68 +0,0 @@
import textwrap
from typing import Callable
class ExternalDataTempTableBuilder:
def __init__(self):
self._table_name = None
self._fields: dict[str, str] = {}
self._primary_key = "id"
self._join_ref_table = None
self._value_getter = None
@property
def table_name(self) -> str:
return self._table_name
@property
def fields(self) -> dict[str, str]:
return self._fields
@property
def primary_key(self) -> str:
return self._primary_key
@property
def join_ref_table(self) -> str:
return self._join_ref_table
def with_table_name(self, table_name: str) -> "ExternalDataTempTableBuilder":
self._join_ref_table = table_name
if "." in table_name:
table_name = table_name.split(".")[-1]
if not table_name.endswith("_temp"):
table_name = f"{table_name}_temp"
self._table_name = table_name
return self
def with_field(self, name: str, sql_type: str, primary=False) -> "ExternalDataTempTableBuilder":
if primary:
sql_type += " PRIMARY KEY"
self._primary_key = name
self._fields[name] = sql_type
return self
def with_value_getter(self, value_getter: Callable) -> "ExternalDataTempTableBuilder":
self._value_getter = value_getter
return self
async def build(self) -> str:
assert self._table_name is not None, "Table name is required"
assert self._value_getter is not None, "Value getter is required"
values_str = ", ".join([f"{value}" for value in await self._value_getter()])
return textwrap.dedent(
f"""
DROP TABLE IF EXISTS {self._table_name};
CREATE TEMP TABLE {self._table_name} (
{", ".join([f"{k} {v}" for k, v in self._fields.items()])}
);
INSERT INTO {self._table_name} VALUES {values_str};
"""
)

View File

@@ -1,5 +0,0 @@
from .connection_abc import ConnectionABC
from .db_context_abc import DBContextABC
from .db_join_model_abc import DbJoinModelABC
from .db_model_abc import DbModelABC
from .db_model_dao_abc import DbModelDaoABC

View File

@@ -1,875 +0,0 @@
import datetime
from abc import ABC, abstractmethod
from enum import Enum
from types import NoneType
from typing import Generic, Optional, Union, Type, List, Any
from cpl.core.typing import T, Id
from cpl.core.utils import String
from cpl.core.utils.get_value import get_value
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.const import DATETIME_FORMAT
from cpl.database.db_logger import DBLogger
from cpl.database.postgres.sql_select_builder import SQLSelectBuilder
from cpl.database.typing import T_DBM, Attribute, AttributeFilters, AttributeSorts
class DataAccessObjectABC(ABC, Generic[T_DBM]):
@abstractmethod
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
from cpl.dependency.service_provider_abc import ServiceProviderABC
self._db = ServiceProviderABC.get_global_provider().get_service(DBContextABC)
self._logger = DBLogger(source)
self._model_type = model_type
self._table_name = table_name
self._logger = DBLogger(source)
self._model_type = model_type
self._table_name = table_name
self._default_filter_condition = None
self.__attributes: dict[str, type] = {}
self.__db_names: dict[str, str] = {}
self.__foreign_tables: dict[str, tuple[str, str]] = {}
self.__foreign_table_keys: dict[str, str] = {}
self.__foreign_dao: dict[str, "DataAccessObjectABC"] = {}
self.__date_attributes: set[str] = set()
self.__ignored_attributes: set[str] = set()
self.__primary_key = "id"
self.__primary_key_type = int
self._external_fields: dict[str, ExternalDataTempTableBuilder] = {}
@property
def table_name(self) -> str:
return self._table_name
def has_attribute(self, attr_name: Attribute) -> bool:
"""
Check if the attribute exists in the DAO
:param Attribute attr_name: Name of the attribute
:return: True if the attribute exists, False otherwise
"""
return attr_name in self.__attributes
def attribute(
self,
attr_name: Attribute,
attr_type: type,
db_name: str = None,
ignore=False,
primary_key=False,
aliases: list[str] = None,
):
"""
Add an attribute for db and object mapping to the data access object
:param Attribute attr_name: Name of the attribute in the object
:param type attr_type: Python type of the attribute to cast db value to
:param str db_name: Name of the field in the database, if None the attribute lowered attr_name without "_" is used
:param bool ignore: Defines if field is ignored for create and update (for e.g. auto increment fields or created/updated fields)
:param bool primary_key: Defines if field is the primary key
:param list[str] aliases: List of aliases for the attribute name
:return:
"""
if isinstance(attr_name, property):
attr_name = attr_name.fget.__name__
self.__attributes[attr_name] = attr_type
if ignore:
self.__ignored_attributes.add(attr_name)
if not db_name:
db_name = attr_name.lower().replace("_", "")
self.__db_names[attr_name] = db_name
self.__db_names[db_name] = db_name
if aliases is not None:
for alias in aliases:
if alias in self.__db_names:
raise ValueError(f"Alias {alias} already exists")
self.__db_names[alias] = db_name
if primary_key:
self.__primary_key = db_name
self.__primary_key_type = attr_type
if attr_type in [datetime, datetime.datetime]:
self.__date_attributes.add(attr_name)
self.__date_attributes.add(db_name)
def reference(
self,
attr: Attribute,
primary_attr: Attribute,
foreign_attr: Attribute,
table_name: str,
reference_dao: "DataAccessObjectABC" = None,
):
"""
Add a reference to another table for the given attribute
:param Attribute attr: Name of the attribute in the object
:param str primary_attr: Name of the primary key in the foreign object
:param str foreign_attr: Name of the foreign key in the object
:param str table_name: Name of the table to reference
:param DataAccessObjectABC reference_dao: The data access object for the referenced table
:return:
"""
if isinstance(attr, property):
attr = attr.fget.__name__
if isinstance(primary_attr, property):
primary_attr = primary_attr.fget.__name__
primary_attr = primary_attr.lower().replace("_", "")
if isinstance(foreign_attr, property):
foreign_attr = foreign_attr.fget.__name__
foreign_attr = foreign_attr.lower().replace("_", "")
self.__foreign_table_keys[attr] = foreign_attr
if reference_dao is not None:
self.__foreign_dao[attr] = reference_dao
if table_name == self._table_name:
return
self.__foreign_tables[attr] = (
table_name,
f"{table_name}.{primary_attr} = {self._table_name}.{foreign_attr}",
)
def use_external_fields(self, builder: ExternalDataTempTableBuilder):
self._external_fields[builder.table_name] = builder
def to_object(self, result: dict) -> T_DBM:
"""
Convert a result from the database to an object
:param dict result: Result from the database
:return:
"""
value_map: dict[str, T] = {}
for db_name, value in result.items():
# Find the attribute name corresponding to the db_name
attr_name = next((k for k, v in self.__db_names.items() if v == db_name), None)
if attr_name:
value_map[attr_name] = self._get_value_from_sql(self.__attributes[attr_name], value)
return self._model_type(**value_map)
def to_dict(self, obj: T_DBM) -> dict:
"""
Convert an object to a dictionary
:param T_DBM obj: Object to convert
:return:
"""
value_map: dict[str, Any] = {}
for attr_name, attr_type in self.__attributes.items():
value = getattr(obj, attr_name)
if isinstance(value, datetime.datetime):
value = value.strftime(DATETIME_FORMAT)
elif isinstance(value, Enum):
value = value.value
value_map[attr_name] = value
for ex_fname in self._external_fields:
ex_field = self._external_fields[ex_fname]
for ex_attr in ex_field.fields:
if ex_attr == self.__primary_key:
continue
value_map[ex_attr] = getattr(obj, ex_attr, None)
return value_map
async def count(self, filters: AttributeFilters = None) -> int:
result = await self._prepare_query(filters=filters, for_count=True)
return result[0]["count"] if result else 0
async def get_history(
self,
entry_id: int,
by_key: str = None,
when: datetime = None,
until: datetime = None,
without_deleted: bool = False,
) -> list[T_DBM]:
"""
Retrieve the history of an entry from the history table.
:param entry_id: The ID of the entry to retrieve history for.
:param by_key: The key to filter by (default is the primary key).
:param when: A specific timestamp to filter the history.
:param until: A timestamp to filter history entries up to a certain point.
:param without_deleted: Exclude deleted entries if True.
:return: A list of historical entries as objects.
"""
f_tables = list(self.__foreign_tables.keys())
history_table = f"{self._table_name}_history"
builder = SQLSelectBuilder(history_table, self.__primary_key)
builder.with_attribute("*")
builder.with_value_condition(
f"{history_table}.{by_key or self.__primary_key}",
"=",
str(entry_id),
f_tables,
)
if self._default_filter_condition:
builder.with_condition(self._default_filter_condition, "", f_tables)
if without_deleted:
builder.with_value_condition(f"{history_table}.deleted", "=", "false", f_tables)
if when:
builder.with_value_condition(
self._attr_from_date_to_char(f"{history_table}.updated"),
"=",
f"'{when.strftime(DATETIME_FORMAT)}'",
f_tables,
)
if until:
builder.with_value_condition(
self._attr_from_date_to_char(f"{history_table}.updated"),
"<=",
f"'{until.strftime(DATETIME_FORMAT)}'",
f_tables,
)
builder.with_order_by(f"{history_table}.updated", "DESC")
query = await builder.build()
result = await self._db.select_map(query)
return [self.to_object(x) for x in result] if result else []
async def get_all(self) -> List[T_DBM]:
result = await self._prepare_query(sorts=[{self.__primary_key: "asc"}])
return [self.to_object(x) for x in result] if result else []
async def get_by_id(self, id: Union[int, str]) -> Optional[T_DBM]:
result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}])
return self.to_object(result[0]) if result else None
async def find_by_id(self, id: Union[int, str]) -> Optional[T_DBM]:
result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}])
return self.to_object(result[0]) if result else None
async def get_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> list[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
if not result or len(result) == 0:
raise ValueError("No result found")
return [self.to_object(x) for x in result] if result else []
async def get_single_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> T_DBM:
result = await self._prepare_query(filters, sorts, take, skip)
if not result:
raise ValueError("No result found")
if len(result) > 1:
raise ValueError("More than one result found")
return self.to_object(result[0])
async def find_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> list[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
return [self.to_object(x) for x in result] if result else []
async def find_single_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> Optional[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
if len(result) > 1:
raise ValueError("More than one result found")
return self.to_object(result[0]) if result else None
async def touch(self, obj: T_DBM):
"""
Touch the entry to update the last updated date
:return:
"""
await self._db.execute(
f"""
UPDATE {self._table_name}
SET updated = NOW()
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
)
async def touch_many_by_id(self, ids: list[Id]):
"""
Touch the entries to update the last updated date
:return:
"""
if len(ids) == 0:
return
await self._db.execute(
f"""
UPDATE {self._table_name}
SET updated = NOW()
WHERE {self.__primary_key} IN ({", ".join([str(x) for x in ids])});
"""
)
async def _build_create_statement(self, obj: T_DBM, skip_editor=False) -> str:
allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes]
fields = ", ".join([self.__db_names[x] for x in allowed_fields])
fields = f"{'EditorId' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}"
values = ", ".join([self._get_value_sql(getattr(obj, x)) for x in allowed_fields])
values = f"{await self._get_editor_id(obj) if not skip_editor else ''}{f', {values}' if not skip_editor and len(values) > 0 else f'{values}'}"
return f"""
INSERT INTO {self._table_name} (
{fields}
) VALUES (
{values}
)
RETURNING {self.__primary_key};
"""
async def create(self, obj: T_DBM, skip_editor=False) -> int:
self._logger.debug(f"create {type(obj).__name__} {obj.__dict__}")
result = await self._db.execute(await self._build_create_statement(obj, skip_editor))
return self._get_value_from_sql(self.__primary_key_type, result[0][0])
async def create_many(self, objs: list[T_DBM], skip_editor=False) -> list[int]:
if len(objs) == 0:
return []
self._logger.debug(f"create many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_create_statement(obj, skip_editor)
result = await self._db.execute(query)
return [self._get_value_from_sql(self.__primary_key_type, x[0]) for x in result]
async def _build_update_statement(self, obj: T_DBM, skip_editor=False) -> str:
allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes]
fields = ", ".join(
[f"{self.__db_names[x]} = {self._get_value_sql(getattr(obj, x, None))}" for x in allowed_fields]
)
fields = f"{f'EditorId = {await self._get_editor_id(obj)}' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}"
return f"""
UPDATE {self._table_name}
SET {fields}
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def update(self, obj: T_DBM, skip_editor=False):
self._logger.debug(f"update {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_update_statement(obj, skip_editor))
async def update_many(self, objs: list[T_DBM], skip_editor=False):
if len(objs) == 0:
return
self._logger.debug(f"update many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_update_statement(obj, skip_editor)
await self._db.execute(query)
async def _build_delete_statement(self, obj: T_DBM, hard_delete: bool = False) -> str:
if hard_delete:
return f"""
DELETE FROM {self._table_name}
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
return f"""
UPDATE {self._table_name}
SET EditorId = {await self._get_editor_id(obj)},
Deleted = true
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def delete(self, obj: T_DBM, hard_delete: bool = False):
self._logger.debug(f"delete {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_delete_statement(obj, hard_delete))
async def delete_many(self, objs: list[T_DBM], hard_delete: bool = False):
if len(objs) == 0:
return
self._logger.debug(f"delete many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_delete_statement(obj, hard_delete)
await self._db.execute(query)
async def _build_restore_statement(self, obj: T_DBM) -> str:
return f"""
UPDATE {self._table_name}
SET EditorId = {await self._get_editor_id(obj)},
Deleted = false
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def restore(self, obj: T_DBM):
self._logger.debug(f"restore {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_restore_statement(obj))
async def restore_many(self, objs: list[T_DBM]):
if len(objs) == 0:
return
self._logger.debug(f"restore many {type(objs[0]).__name__} {len(objs)} {objs[0].__dict__}")
query = ""
for obj in objs:
query += await self._build_restore_statement(obj)
await self._db.execute(query)
async def _prepare_query(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
for_count=False,
) -> list[dict]:
"""
Prepares and executes a query using the SQLBuilder with the given parameters.
:param filters: Conditions to filter the query.
:param sorts: Sorting attributes and directions.
:param take: Limit the number of results.
:param skip: Offset the results.
:return: Query result as a list of dictionaries.
"""
external_table_deps = []
builder = SQLSelectBuilder(self._table_name, self.__primary_key)
for temp in self._external_fields:
builder.with_temp_table(self._external_fields[temp])
if for_count:
builder.with_attribute("COUNT(*)", ignore_table_name=True)
else:
builder.with_attribute("*")
for attr in self.__foreign_tables:
table, join_condition = self.__foreign_tables[attr]
builder.with_left_join(table, join_condition)
if filters:
await self._build_conditions(builder, filters, external_table_deps)
if sorts:
self._build_sorts(builder, sorts, external_table_deps)
if take:
builder.with_limit(take)
if skip:
builder.with_offset(skip)
for external_table in external_table_deps:
builder.use_temp_table(external_table)
query = await builder.build()
return await self._db.select_map(query)
async def _build_conditions(
self,
builder: SQLSelectBuilder,
filters: AttributeFilters,
external_table_deps: list[str],
):
"""
Builds SQL conditions from GraphQL-like filters and adds them to the SQLBuilder.
:param builder: The SQLBuilder instance to add conditions to.
:param filters: GraphQL-like filter structure.
:param external_table_deps: List to store external table dependencies.
"""
if not isinstance(filters, list):
filters = [filters]
for filter_group in filters:
sql_conditions = self._graphql_to_sql_conditions(filter_group, external_table_deps)
for attr, operator, value in sql_conditions:
if attr in self.__foreign_table_keys:
attr = self.__foreign_table_keys[attr]
recursive_join = self._get_recursive_reference_join(attr)
if recursive_join is not None:
builder.with_left_join(*recursive_join)
external_table = self._get_external_field_key(attr)
if external_table is not None:
external_table_deps.append(external_table)
if operator == "fuzzy":
builder.with_levenshtein_condition(attr)
elif operator in [
"IS NULL",
"IS NOT NULL",
]: # operator without value
builder.with_condition(
attr,
operator,
[
x[0]
for fdao in self.__foreign_dao
for x in self.__foreign_dao[fdao].__foreign_tables.values()
],
)
else:
if attr in self.__date_attributes or String.to_snake_case(attr) in self.__date_attributes:
attr = self._attr_from_date_to_char(f"{self._table_name}.{attr}")
builder.with_value_condition(
attr,
operator,
self._get_value_sql(value),
[
x[0]
for fdao in self.__foreign_dao
for x in self.__foreign_dao[fdao].__foreign_tables.values()
],
)
def _graphql_to_sql_conditions(
self, graphql_structure: dict, external_table_deps: list[str]
) -> list[tuple[str, str, Any]]:
"""
Converts a GraphQL-like structure to SQL conditions.
:param graphql_structure: The GraphQL-like filter structure.
:param external_table_deps: List to track external table dependencies.
:return: A list of tuples (attribute, operator, value).
"""
operators = {
"equal": "=",
"notEqual": "!=",
"greater": ">",
"greaterOrEqual": ">=",
"less": "<",
"lessOrEqual": "<=",
"isNull": "IS NULL",
"isNotNull": "IS NOT NULL",
"contains": "LIKE", # Special handling in _graphql_to_sql_conditions
"notContains": "NOT LIKE", # Special handling in _graphql_to_sql_conditions
"startsWith": "LIKE", # Special handling in _graphql_to_sql_conditions
"endsWith": "LIKE", # Special handling in _graphql_to_sql_conditions
"in": "IN",
"notIn": "NOT IN",
}
conditions = []
def parse_node(node, parent_key=None, parent_dao=None):
if not isinstance(node, dict):
return
if isinstance(node, list):
conditions.append((parent_key, "IN", node))
return
for key, value in node.items():
if isinstance(key, property):
key = key.fget.__name__
external_fields_table_name_by_parent = self._get_external_field_key(parent_key)
external_fields_table_name = self._get_external_field_key(key)
external_field = (
external_fields_table_name
if external_fields_table_name_by_parent is None
else external_fields_table_name_by_parent
)
if key == "fuzzy":
self._handle_fuzzy_filter_conditions(conditions, external_table_deps, value)
elif parent_dao is not None and key in parent_dao.__db_names:
parse_node(value, f"{parent_dao.table_name}.{key}")
continue
elif external_field is not None:
external_table_deps.append(external_field)
parse_node(value, f"{external_field}.{key}")
elif parent_key in self.__foreign_table_keys:
if key in operators:
parse_node({key: value}, self.__foreign_table_keys[parent_key])
continue
if parent_key in self.__foreign_dao:
foreign_dao = self.__foreign_dao[parent_key]
if key in foreign_dao.__foreign_tables:
parse_node(
value,
f"{self.__foreign_tables[parent_key][0]}.{foreign_dao.__foreign_table_keys[key]}",
foreign_dao.__foreign_dao[key],
)
continue
if parent_key in self.__foreign_tables:
parse_node(value, f"{self.__foreign_tables[parent_key][0]}.{key}")
continue
parse_node({parent_key: value})
elif key in operators:
operator = operators[key]
if key == "contains" or key == "notContains":
value = f"%{value}%"
elif key == "in" or key == "notIn":
value = value
elif key == "startsWith":
value = f"{value}%"
elif key == "endsWith":
value = f"%{value}"
elif key == "isNull" or key == "isNotNull":
is_null_value = value.get("equal", None) if isinstance(value, dict) else value
if is_null_value is None:
operator = operators[key]
elif (key == "isNull" and is_null_value) or (key == "isNotNull" and not is_null_value):
operator = "IS NULL"
else:
operator = "IS NOT NULL"
conditions.append((parent_key, operator, None))
elif (key == "equal" or key == "notEqual") and value is None:
operator = operators["isNull"]
conditions.append((parent_key, operator, value))
elif isinstance(value, dict):
if key in self.__foreign_table_keys:
parse_node(value, key)
elif key in self.__db_names and parent_key is not None:
parse_node({f"{parent_key}": value})
elif key in self.__db_names:
parse_node(value, self.__db_names[key])
else:
parse_node(value, key)
elif value is None:
conditions.append((self.__db_names[key], "IS NULL", value))
else:
conditions.append((self.__db_names[key], "=", value))
parse_node(graphql_structure)
return conditions
def _handle_fuzzy_filter_conditions(self, conditions, external_field_table_deps, sub_values):
# Extract fuzzy filter parameters
fuzzy_fields = get_value(sub_values, "fields", list[str])
fuzzy_term = get_value(sub_values, "term", str)
fuzzy_threshold = get_value(sub_values, "threshold", int, 5)
if not fuzzy_fields or not fuzzy_term:
raise ValueError("Fuzzy filter must include 'fields' and 'term'.")
fuzzy_fields_db_names = []
# Map fields to their database names
for fuzzy_field in fuzzy_fields:
external_fields_table_name = self._get_external_field_key(fuzzy_field)
if external_fields_table_name is not None:
external_fields_table = self._external_fields[external_fields_table_name]
fuzzy_fields_db_names.append(f"{external_fields_table.table_name}.{fuzzy_field}")
external_field_table_deps.append(external_fields_table.table_name)
elif fuzzy_field in self.__db_names:
fuzzy_fields_db_names.append(f"{self._table_name}.{self.__db_names[fuzzy_field]}")
elif fuzzy_field in self.__foreign_tables:
fuzzy_fields_db_names.append(f"{self._table_name}.{self.__foreign_table_keys[fuzzy_field]}")
else:
fuzzy_fields_db_names.append(self.__db_names[String.to_snake_case(fuzzy_field)][0])
# Build fuzzy conditions for each field
fuzzy_conditions = self._build_fuzzy_conditions(fuzzy_fields_db_names, fuzzy_term, fuzzy_threshold)
# Combine conditions with OR and append to the main conditions
conditions.append((f"({' OR '.join(fuzzy_conditions)})", "fuzzy", None))
@staticmethod
def _build_fuzzy_conditions(fields: list[str], term: str, threshold: int = 10) -> list[str]:
conditions = []
for field in fields:
conditions.append(f"levenshtein({field}::TEXT, '{term}') <= {threshold}") # Adjust the threshold as needed
return conditions
def _get_external_field_key(self, field_name: str) -> Optional[str]:
"""
Returns the key to get the external field if found, otherwise None.
:param str field_name: The name of the field to search for.
:return: The key if found, otherwise None.
:rtype: Optional[str]
"""
if field_name is None:
return None
for key, builder in self._external_fields.items():
if field_name in builder.fields and field_name not in self.__db_names:
return key
return None
def _get_recursive_reference_join(self, attr: str) -> Optional[tuple[str, str]]:
parts = attr.split(".")
table_name = ".".join(parts[:-1])
if table_name == self._table_name or table_name == "":
return None
all_foreign_tables = {
x[0]: x[1]
for x in [
*[x for x in self.__foreign_tables.values() if x[0] != self._table_name],
*[x for fdao in self.__foreign_dao for x in self.__foreign_dao[fdao].__foreign_tables.values()],
]
}
if not table_name in all_foreign_tables:
return None
return table_name, all_foreign_tables[table_name]
def _build_sorts(
self,
builder: SQLSelectBuilder,
sorts: AttributeSorts,
external_table_deps: list[str],
):
"""
Resolves complex sorting structures into SQL-compatible sorting conditions.
Tracks external table dependencies.
:param builder: The SQLBuilder instance to add sorting to.
:param sorts: Sorting attributes and directions in a complex structure.
:param external_table_deps: List to track external table dependencies.
"""
def parse_sort_node(node, parent_key=None):
if isinstance(node, dict):
for key, value in node.items():
if isinstance(value, dict):
# Recursively parse nested structures
parse_sort_node(value, key)
elif isinstance(value, str) and value.lower() in ["asc", "desc"]:
external_table = self._get_external_field_key(key)
if external_table:
external_table_deps.append(external_table)
key = f"{external_table}.{key}"
if parent_key in self.__foreign_tables:
key = f"{self.__foreign_tables[parent_key][0]}.{key}"
builder.with_order_by(key, value.upper())
else:
raise ValueError(f"Invalid sort direction: {value}")
elif isinstance(node, list):
for item in node:
parse_sort_node(item)
else:
raise ValueError(f"Invalid sort structure: {node}")
parse_sort_node(sorts)
def _get_value_sql(self, value: Any) -> str:
if isinstance(value, str):
if value.lower() == "null":
return "NULL"
return f"'{value}'"
if isinstance(value, NoneType):
return "NULL"
if value is None:
return "NULL"
if isinstance(value, Enum):
return f"'{value.value}'"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, list):
if len(value) == 0:
return "()"
return f"({', '.join([self._get_value_sql(x) for x in value])})"
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
return f"'{value.strftime(DATETIME_FORMAT)}'"
return str(value)
@staticmethod
def _get_value_from_sql(cast_type: type, value: Any) -> Optional[T]:
"""
Get the value from the query result and cast it to the correct type
:param type cast_type:
:param Any value:
:return Optional[T]: Casted value, when value is str "NULL" None is returned
"""
if isinstance(value, str) and "NULL" in value:
return None
if isinstance(value, NoneType):
return None
if isinstance(value, cast_type):
return value
return cast_type(value)
def _get_primary_key_value_sql(self, obj: T_DBM) -> str:
value = getattr(obj, self.__primary_key)
if isinstance(value, str):
return f"'{value}'"
return value
@staticmethod
def _attr_from_date_to_char(attr: str) -> str:
return f"TO_CHAR({attr}, 'YYYY-MM-DD HH24:MI:SS.US TZ')"
@staticmethod
async def _get_editor_id(obj: T_DBM):
editor_id = obj.editor_id
# if editor_id is None:
# user = get_user()
# if user is not None:
# editor_id = user.id
return editor_id if editor_id is not None else "NULL"

View File

@@ -1,53 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any
from cpl.database.model.database_settings import DatabaseSettings
class DBContextABC(ABC):
r"""ABC for the :class:`cpl.database.context.database_context.DatabaseContext`"""
@abstractmethod
def connect(self, database_settings: DatabaseSettings):
r"""Connects to a database by connection settings
Parameter:
database_settings :class:`cpl.database.database_settings.DatabaseSettings`
"""
@abstractmethod
async def execute(self, statement: str, args=None, multi=True) -> list[list]:
r"""Runs SQL Statements
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
multi: :class:`bool`
Returns:
list: Fetched list of executed elements
"""
@abstractmethod
async def select_map(self, statement: str, args=None) -> list[dict]:
r"""Runs SQL Select Statements and returns a list of dictionaries
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
Returns:
list: Fetched list of executed elements as dictionary
"""
@abstractmethod
async def select(self, statement: str, args=None) -> list[str] | list[tuple] | list[Any]:
r"""Runs SQL Select Statements and returns a list of dictionaries
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
Returns:
list: Fetched list of executed elements
"""

View File

@@ -1,30 +0,0 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import Id, SerialId
from cpl.database.abc.db_model_abc import DbModelABC
class DbJoinModelABC[T](DbModelABC[T]):
def __init__(
self,
id: Id,
source_id: Id,
foreign_id: Id,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._source_id = source_id
self._foreign_id = foreign_id
@property
def source_id(self) -> Id:
return self._source_id
@property
def foreign_id(self) -> Id:
return self._foreign_id

View File

@@ -1,79 +0,0 @@
from abc import ABC
from datetime import datetime, timezone
from typing import Optional, Generic
from cpl.core.typing import Id, SerialId, T
class DbModelABC(ABC, Generic[T]):
def __init__(
self,
id: Id,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
self._id = id
self._deleted = deleted
self._editor_id = editor_id
self._created = created if created is not None else datetime.now(timezone.utc).isoformat()
self._updated = updated if updated is not None else datetime.now(timezone.utc).isoformat()
@property
def id(self) -> Id:
return self._id
@property
def deleted(self) -> bool:
return self._deleted
@deleted.setter
def deleted(self, value: bool):
self._deleted = value
@property
def editor_id(self) -> SerialId:
return self._editor_id
@editor_id.setter
def editor_id(self, value: SerialId):
self._editor_id = value
# @async_property
# async def editor(self):
# if self._editor_id is None:
# return None
#
# from data.schemas.administration.user_dao import userDao
#
# return await userDao.get_by_id(self._editor_id)
@property
def created(self) -> datetime:
return self._created
@property
def updated(self) -> datetime:
return self._updated
@updated.setter
def updated(self, value: datetime):
self._updated = value
def to_dict(self) -> dict:
result = {}
for name, value in self.__dict__.items():
if not name.startswith("_") or name.endswith("_"):
continue
if isinstance(value, datetime):
value = value.isoformat()
if not isinstance(value, str):
value = str(value)
result[name.replace("_", "")] = value
return result

View File

@@ -1,25 +0,0 @@
from abc import abstractmethod
from datetime import datetime
from typing import Type
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.abc.db_model_abc import DbModelABC
from cpl.database.internal_tables import InternalTables
class DbModelDaoABC[T_DBM](DataAccessObjectABC[T_DBM]):
@abstractmethod
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
DataAccessObjectABC.__init__(self, source, model_type, table_name)
self.attribute(DbModelABC.id, int, ignore=True)
self.attribute(DbModelABC.deleted, bool)
self.attribute(DbModelABC.editor_id, int, ignore=True) # handled by db trigger
self.reference(
"editor", "id", DbModelABC.editor_id, InternalTables.users
) # not relevant for updates due to editor_id
self.attribute(DbModelABC.created, datetime, ignore=True) # handled by db trigger
self.attribute(DbModelABC.updated, datetime, ignore=True) # handled by db trigger

View File

@@ -1 +0,0 @@
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f %z"

View File

@@ -1,8 +0,0 @@
from cpl.core.log import Logger
from cpl.core.typing import Source
class DBLogger(Logger):
def __init__(self, source: Source):
Logger.__init__(self, source, "db")

View File

@@ -1,15 +0,0 @@
from cpl.database.model.server_type import ServerTypes, ServerType
class InternalTables:
@classmethod
@property
def users(cls) -> str:
return "administration.users" if ServerType.server_type is ServerTypes.POSTGRES else "users"
@classmethod
@property
def executed_migrations(cls) -> str:
return "system._executed_migrations" if ServerType.server_type is ServerTypes.POSTGRES else "_executed_migrations"

View File

@@ -1,3 +0,0 @@
from .database_settings import DatabaseSettings
from .migration import Migration
from .server_type import ServerTypes

View File

@@ -1,12 +0,0 @@
class Migration:
def __init__(self, name: str, script: str):
self._name = name
self._script = script
@property
def name(self) -> str:
return self._name
@property
def script(self) -> str:
return self._script

View File

@@ -1,21 +0,0 @@
from enum import Enum
class ServerTypes(Enum):
POSTGRES = "postgres"
MYSQL = "mysql"
class ServerType:
_server_type: ServerTypes = None
@classmethod
def set_server_type(cls, server_type: ServerTypes):
assert server_type is not None, "server_type must not be None"
assert isinstance(server_type, ServerTypes), f"Expected ServerType but got {type(server_type)}"
cls._server_type = server_type
@classmethod
@property
def server_type(cls) -> ServerTypes:
assert cls._server_type is not None, "Server type is not set"
return cls._server_type

View File

@@ -1,84 +0,0 @@
import uuid
from typing import Any, List, Dict, Tuple, Union
from mysql.connector import Error as MySQLError, PoolError
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.db_logger import DBLogger
from cpl.database.model.database_settings import DatabaseSettings
from cpl.database.mysql.mysql_pool import MySQLPool
_logger = DBLogger(__name__)
class DBContext(DBContextABC):
def __init__(self):
DBContextABC.__init__(self)
self._pool: MySQLPool = None
self._fails = 0
self.connect(Configuration.get(DatabaseSettings))
def connect(self, database_settings: DatabaseSettings):
try:
_logger.debug("Connecting to database")
self._pool = MySQLPool(
database_settings,
)
_logger.info("Connected to database")
except Exception as e:
_logger.fatal("Connecting to database failed", e)
async def execute(self, statement: str, args=None, multi=True) -> List[List]:
_logger.trace(f"execute {statement} with args: {args}")
return await self._pool.execute(statement, args, multi)
async def select_map(self, statement: str, args=None) -> List[Dict]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select_map(statement, args)
except (MySQLError, PoolError) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select_map(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e
async def select(self, statement: str, args=None) -> Union[List[str], List[Tuple], List[Any]]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select(statement, args)
except (MySQLError, PoolError) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e

View File

@@ -1,105 +0,0 @@
from typing import Optional, Any
import sqlparse
import aiomysql
from cpl.core.environment import Environment
from cpl.database.db_logger import DBLogger
from cpl.database.model import DatabaseSettings
_logger = DBLogger(__name__)
class MySQLPool:
"""
Create a pool when connecting to MySQL, which will decrease the time spent in
requesting connection, creating connection, and closing connection.
"""
def __init__(self, database_settings: DatabaseSettings):
self._db_settings = database_settings
self.pool: Optional[aiomysql.Pool] = None
async def _get_pool(self):
if self.pool is None or self.pool._closed:
try:
self.pool = await aiomysql.create_pool(
host=self._db_settings.host,
port=self._db_settings.port,
user=self._db_settings.user,
password=self._db_settings.password,
db=self._db_settings.database,
minsize=1,
maxsize=Environment.get("DB_POOL_SIZE", int, 1),
autocommit=True,
)
except Exception as e:
_logger.fatal("Failed to connect to the database", e)
raise
return self.pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
if multi:
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
for q in queries:
if q.strip() == "":
continue
await cursor.execute(q, args)
else:
await cursor.execute(query, args)
async def execute(self, query: str, args=None, multi=True) -> list[list]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
if cursor.description is not None: # Query returns rows
res = await cursor.fetchall()
if res is None:
return []
return [list(row) for row in res]
else:
return []
async def select(self, query: str, args=None, multi=True) -> list[str]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor(aiomysql.DictCursor) as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)

View File

@@ -1,86 +0,0 @@
import uuid
from typing import Any
from psycopg import OperationalError
from psycopg_pool import PoolTimeout
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.database_settings import DatabaseSettings
from cpl.database.db_logger import DBLogger
from cpl.database.postgres.postgres_pool import PostgresPool
_logger = DBLogger(__name__)
class DBContext(DBContextABC):
def __init__(self):
DBContextABC.__init__(self)
self._pool: PostgresPool = None
self._fails = 0
self.connect(Configuration.get(DatabaseSettings))
def connect(self, database_settings: DatabaseSettings):
try:
_logger.debug("Connecting to database")
self._pool = PostgresPool(
database_settings,
Environment.get("DB_POOL_SIZE", int, 1),
)
_logger.info("Connected to database")
except Exception as e:
_logger.fatal("Connecting to database failed", e)
async def execute(self, statement: str, args=None, multi=True) -> list[list]:
_logger.trace(f"execute {statement} with args: {args}")
return await self._pool.execute(statement, args, multi)
async def select_map(self, statement: str, args=None) -> list[dict]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select_map(statement, args)
except (OperationalError, PoolTimeout) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select_map(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e
async def select(self, statement: str, args=None) -> list[str] | list[tuple] | list[Any]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select(statement, args)
except (OperationalError, PoolTimeout) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e

View File

@@ -1,123 +0,0 @@
from typing import Optional, Any
import sqlparse
from psycopg import sql
from psycopg_pool import AsyncConnectionPool, PoolTimeout
from cpl.core.environment import Environment
from cpl.database.db_logger import DBLogger
from cpl.database.model import DatabaseSettings
_logger = DBLogger(__name__)
class PostgresPool:
"""
Create a pool when connecting to PostgreSQL, which will decrease the time spent in
requesting connection, creating connection, and closing connection.
"""
def __init__(self, database_settings: DatabaseSettings):
self._conninfo = (
f"host={database_settings.host} "
f"port={database_settings.port} "
f"user={database_settings.user} "
f"password={database_settings.password} "
f"dbname={database_settings.database}"
)
self.pool: Optional[AsyncConnectionPool] = None
async def _get_pool(self):
pool = AsyncConnectionPool(
conninfo=self._conninfo, open=False, min_size=1, max_size=Environment.get("DB_POOL_SIZE", int, 1)
)
await pool.open()
try:
async with pool.connection() as con:
await pool.check_connection(con)
except PoolTimeout as e:
await pool.close()
_logger.fatal(f"Failed to connect to the database", e)
return pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
if multi:
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
for q in queries:
if q.strip() == "":
continue
await cursor.execute(sql.SQL(q), args)
else:
await cursor.execute(sql.SQL(query), args)
async def execute(self, query: str, args=None, multi=True) -> list[list]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
if cursor.description is not None: # Check if the query returns rows
res = await cursor.fetchall()
if res is None:
return []
result = []
for row in res:
result.append(list(row))
return result
else:
return []
async def select(self, query: str, args=None, multi=True) -> list[str]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
res_map: list[dict] = []
for i_res in range(len(res)):
cols = {}
for i_col in range(len(res[i_res])):
cols[cursor.description[i_col].name] = res[i_res][i_col]
res_map.append(cols)
return res_map

View File

@@ -1,154 +0,0 @@
from typing import Optional, Union
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
class SQLSelectBuilder:
def __init__(self, table_name: str, primary_key: str):
self._table_name = table_name
self._primary_key = primary_key
self._temp_tables: dict[str, ExternalDataTempTableBuilder] = {}
self._to_use_temp_tables: list[str] = []
self._attributes: list[str] = []
self._tables: list[str] = [table_name]
self._joins: dict[str, (str, str)] = {}
self._conditions: list[str] = []
self._order_by: str = ""
self._limit: Optional[int] = None
self._offset: Optional[int] = None
def with_temp_table(self, temp_table: ExternalDataTempTableBuilder) -> "SQLSelectBuilder":
self._temp_tables[temp_table.table_name] = temp_table
return self
def use_temp_table(self, temp_table_name: str):
if temp_table_name not in self._temp_tables:
raise ValueError(f"Temp table {temp_table_name} not found.")
self._to_use_temp_tables.append(temp_table_name)
def with_attribute(self, attr: str, ignore_table_name=False) -> "SQLSelectBuilder":
if not ignore_table_name and not attr.startswith(self._table_name):
attr = f"{self._table_name}.{attr}"
self._attributes.append(attr)
return self
def with_foreign_attribute(self, attr: str) -> "SQLSelectBuilder":
self._attributes.append(attr)
return self
def with_table(self, table_name: str) -> "SQLSelectBuilder":
self._tables.append(table_name)
return self
def _check_prefix(self, attr: str, foreign_tables: list[str]) -> str:
assert attr is not None
if "TO_CHAR" in attr:
return attr
valid_prefixes = [
"levenshtein",
self._table_name,
*self._joins.keys(),
*self._temp_tables.keys(),
*foreign_tables,
]
if not any(attr.startswith(f"{prefix}.") for prefix in valid_prefixes):
attr = f"{self._table_name}.{attr}"
return attr
def with_value_condition(
self, attr: str, operator: str, value: str, foreign_tables: list[str]
) -> "SQLSelectBuilder":
attr = self._check_prefix(attr, foreign_tables)
self._conditions.append(f"{attr} {operator} {value}")
return self
def with_levenshtein_condition(self, condition: str) -> "SQLSelectBuilder":
self._conditions.append(condition)
return self
def with_condition(self, attr: str, operator: str, foreign_tables: list[str]) -> "SQLSelectBuilder":
attr = self._check_prefix(attr, foreign_tables)
self._conditions.append(f"{attr} {operator}")
return self
def with_grouped_conditions(self, conditions: list[str]) -> "SQLSelectBuilder":
self._conditions.append(f"({' AND '.join(conditions)})")
return self
def with_left_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "LEFT")
self._joins[table] = (on, "LEFT")
return self
def with_inner_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "INNER")
self._joins[table] = (on, "INNER")
return self
def with_right_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "RIGHT")
self._joins[table] = (on, "RIGHT")
return self
def with_limit(self, limit: int) -> "SQLSelectBuilder":
self._limit = limit
return self
def with_offset(self, offset: int) -> "SQLSelectBuilder":
self._offset = offset
return self
def with_order_by(self, column: Union[str, property], direction: str = "ASC") -> "SQLSelectBuilder":
if isinstance(column, property):
column = column.fget.__name__
self._order_by = f"{column} {direction}"
return self
async def _handle_temp_table_use(self, query) -> str:
new_query = ""
for temp_table_name in self._to_use_temp_tables:
temp_table = self._temp_tables[temp_table_name]
new_query += await self._temp_tables[temp_table_name].build()
self.with_left_join(
temp_table.table_name,
f"{temp_table.join_ref_table}.{self._primary_key} = {temp_table.table_name}.{temp_table.primary_key}",
)
return f"{new_query} {query}" if new_query != "" else query
async def build(self) -> str:
query = await self._handle_temp_table_use("")
attributes = ", ".join(self._attributes) if self._attributes else "*"
query += f"SELECT {attributes} FROM {", ".join(self._tables)}"
for join in self._joins:
query += f" {self._joins[join][1]} JOIN {join} ON {self._joins[join][0]}"
if self._conditions:
query += " WHERE " + " AND ".join(self._conditions)
if self._order_by:
query += f" ORDER BY {self._order_by}"
if self._limit is not None:
query += f" LIMIT {self._limit}"
if self._offset is not None:
query += f" OFFSET {self._offset}"
return query

View File

@@ -1,18 +0,0 @@
from datetime import datetime
from typing import Optional
from cpl.database.abc import DbModelABC
class ExecutedMigration(DbModelABC):
def __init__(
self,
migration_id: str,
created: Optional[datetime] = None,
modified: Optional[datetime] = None,
):
DbModelABC.__init__(self, migration_id, False, created, modified)
@property
def migration_id(self) -> str:
return self._id

View File

@@ -1,14 +0,0 @@
from cpl.database import InternalTables
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.db_logger import DBLogger
from cpl.database.schema.executed_migration import ExecutedMigration
_logger = DBLogger(__name__)
class ExecutedMigrationDao(DataAccessObjectABC[ExecutedMigration]):
def __init__(self):
DataAccessObjectABC.__init__(self, __name__, ExecutedMigration, InternalTables.executed_migrations)
self.attribute(ExecutedMigration.migration_id, str, primary_key=True, db_name="migrationId")

View File

@@ -1,6 +0,0 @@
CREATE TABLE IF NOT EXISTS _executed_migrations
(
migrationId VARCHAR(255) PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

View File

@@ -1,26 +0,0 @@
DELIMITER //
CREATE TRIGGER mytable_before_update
BEFORE UPDATE
ON mytable
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
SET NEW.updated = NOW();
END;
//
DELIMITER ;
DELIMITER //
CREATE TRIGGER mytable_before_delete
BEFORE DELETE
ON mytable
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
END;
//
DELIMITER ;

View File

@@ -1,47 +0,0 @@
CREATE SCHEMA IF NOT EXISTS public;
CREATE SCHEMA IF NOT EXISTS system;
CREATE TABLE IF NOT EXISTS system._executed_migrations
(
MigrationId VARCHAR(255) PRIMARY KEY,
Created timestamptz NOT NULL DEFAULT NOW(),
Updated timestamptz NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION public.history_trigger_function()
RETURNS TRIGGER AS
$$
DECLARE
schema_name TEXT;
history_table_name TEXT;
BEGIN
-- Construct the name of the history table based on the current table
schema_name := TG_TABLE_SCHEMA;
history_table_name := TG_TABLE_NAME || '_history';
IF (TG_OP = 'INSERT') THEN
RETURN NEW;
END IF;
-- Insert the old row into the history table on UPDATE or DELETE
IF (TG_OP = 'UPDATE' OR TG_OP = 'DELETE') THEN
EXECUTE format(
'INSERT INTO %I.%I SELECT ($1).*',
schema_name,
history_table_name
)
USING OLD;
END IF;
-- For UPDATE, update the Updated column and return the new row
IF (TG_OP = 'UPDATE') THEN
NEW.updated := NOW(); -- Update the Updated column
RETURN NEW;
END IF;
-- For DELETE, return OLD to allow the deletion
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;

View File

@@ -1,111 +0,0 @@
import glob
import os
from cpl.database.abc import DBContextABC
from cpl.database.db_logger import DBLogger
from cpl.database.model import Migration
from cpl.database.model.server_type import ServerType, ServerTypes
from cpl.database.schema.executed_migration import ExecutedMigration
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
_logger = DBLogger(__name__)
class MigrationService:
def __init__(self, db: DBContextABC, executedMigrationDao: ExecutedMigrationDao):
self._db = db
self._executedMigrationDao = executedMigrationDao
self._script_directories: list[str] = []
if ServerType.server_type == ServerTypes.POSTGRES:
self.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../scripts/postgres"))
elif ServerType.server_type == ServerTypes.MYSQL:
self.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../scripts/mysql"))
def with_directory(self, directory: str) -> "MigrationService":
self._script_directories.append(directory)
return self
async def _get_migration_history(self) -> list[ExecutedMigration]:
results = await self._db.select(f"SELECT * FROM {self._executedMigrationDao.table_name}")
applied_migrations = []
for result in results:
applied_migrations.append(ExecutedMigration(result[0]))
return applied_migrations
@staticmethod
def _load_scripts_by_path(path: str) -> list[Migration]:
migrations = []
if not os.path.exists(path):
raise Exception("Migration path not found")
files = sorted(glob.glob(f"{path}/*"))
for file in files:
if not file.endswith(".sql"):
continue
name = str(file.split(".sql")[0])
if "/" in name:
name = name.split("/")[-1]
with open(f"{file}", "r") as f:
script = f.read()
f.close()
migrations.append(Migration(name, script))
return migrations
def _load_scripts(self) -> list[Migration]:
migrations = []
for path in self._script_directories:
migrations.extend(self._load_scripts_by_path(path))
return migrations
async def _get_tables(self):
if ServerType == ServerTypes.POSTGRES:
return await self._db.select(
"""
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public';
"""
)
else:
return await self._db.select(
"""
SHOW TABLES;
"""
)
async def _execute(self, migrations: list[Migration]):
result = await self._get_tables()
for migration in migrations:
active_statement = ""
try:
# check if table exists
if len(result) > 0:
migration_from_db = await self._executedMigrationDao.find_by_id(migration.name)
if migration_from_db is not None:
continue
_logger.debug(f"Running upgrade migration: {migration.name}")
await self._db.execute(migration.script, multi=True)
await self._executedMigrationDao.create(ExecutedMigration(migration.name), skip_editor=True)
except Exception as e:
_logger.fatal(
f"Migration failed: {migration.name}\n{active_statement}",
e,
)
async def migrate(self):
await self._execute(self._load_scripts())

View File

@@ -1,65 +0,0 @@
from datetime import datetime
from typing import TypeVar, Union, Literal, Any
from cpl.database.abc.db_model_abc import DbModelABC
T_DBM = TypeVar("T_DBM", bound=DbModelABC)
NumberFilterOperator = Literal[
"equal",
"notEqual",
"greater",
"greaterOrEqual",
"less",
"lessOrEqual",
"isNull",
"isNotNull",
]
StringFilterOperator = Literal[
"equal",
"notEqual",
"contains",
"notContains",
"startsWith",
"endsWith",
"isNull",
"isNotNull",
]
BoolFilterOperator = Literal[
"equal",
"notEqual",
"isNull",
"isNotNull",
]
DateFilterOperator = Literal[
"equal",
"notEqual",
"greater",
"greaterOrEqual",
"less",
"lessOrEqual",
"isNull",
"isNotNull",
]
FilterOperator = Union[NumberFilterOperator, StringFilterOperator, BoolFilterOperator, DateFilterOperator]
Attribute = Union[str, property]
AttributeCondition = Union[
dict[NumberFilterOperator, int],
dict[StringFilterOperator, str],
dict[BoolFilterOperator, bool],
dict[DateFilterOperator, datetime],
]
AttributeFilter = dict[Attribute, Union[list[Union[AttributeCondition, Any]], AttributeCondition, Any]]
AttributeFilters = Union[
list[AttributeFilter],
AttributeFilter,
]
AttributeSort = dict[Attribute, Literal["asc", "desc"]]
AttributeSorts = Union[
list[AttributeSort],
AttributeSort,
]

View File

@@ -1,30 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-database"
version = "2024.7.0"
description = "CPL database"
readme ="CPL database package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "database", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1,8 +0,0 @@
cpl-core
cpl-dependency
psycopg[binary]==3.2.3
psycopg-pool==3.2.4
sqlparse==0.5.3
mysql-connector-python==9.4.0
async-property==0.2.2
aiomysql==0.2.0

View File

@@ -1,7 +0,0 @@
from .scope import Scope
from .scope_abc import ScopeABC
from .service_collection import ServiceCollection
from .service_descriptor import ServiceDescriptor
from .service_lifetime_enum import ServiceLifetimeEnum
from .service_provider import ServiceProvider
from .service_provider_abc import ServiceProviderABC

View File

@@ -1,18 +0,0 @@
from cpl.dependency.scope import Scope
from cpl.dependency.scope_abc import ScopeABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
class ScopeBuilder:
r"""Class to build :class:`cpl.dependency.scope.Scope`"""
def __init__(self, service_provider: ServiceProviderABC) -> None:
self._service_provider = service_provider
def build(self) -> ScopeABC:
r"""Returns scope
Returns:
Object of type :class:`cpl.dependency.scope.Scope`
"""
return Scope(self._service_provider)

View File

@@ -1,30 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-dependency"
version = "2024.7.0"
description = "CPL dependency"
readme ="CPL dependency package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "dependency", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1 +0,0 @@
cpl-core

View File

@@ -1,21 +0,0 @@
from cpl.dependency import ServiceCollection as _ServiceCollection
from .abc.email_client_abc import EMailClientABC
from .email_client import EMailClient
from .email_client_settings import EMailClientSettings
from .email_client_settings_name_enum import EMailClientSettingsNameEnum
from .email_model import EMail
from .mail_logger import MailLogger
def add_mail(collection: _ServiceCollection):
from cpl.core.console import Console
from cpl.core.log import LoggerABC
try:
collection.add_singleton(EMailClientABC, EMailClient)
collection.add_transient(LoggerABC, MailLogger)
except ImportError as e:
Console.error("cpl-translation is not installed", str(e))
_ServiceCollection.with_module(add_mail, __name__)

View File

@@ -1,88 +0,0 @@
import ssl
from smtplib import SMTP
from typing import Optional
from cpl.core.utils.credential_manager import CredentialManager
from cpl.mail.abc.email_client_abc import EMailClientABC
from cpl.mail.email_client_settings import EMailClientSettings
from cpl.mail.email_model import EMail
from cpl.mail.mail_logger import MailLogger
class EMailClient(EMailClientABC):
r"""Service to send emails
Parameter:
environment: :class:`cpl.core.environment.application_environment_abc.ApplicationEnvironmentABC`
Environment of the application
logger: :class:`cpl.core.log.logger_abc.LoggerABC`
The logger to use
mail_settings: :class:`cpl.mail.email_client_settings.EMailClientSettings`
Settings for mailing
"""
def __init__(self, logger: MailLogger, mail_settings: EMailClientSettings):
EMailClientABC.__init__(self)
assert mail_settings is not None, "mail_settings must not be None"
self._mail_settings = mail_settings
self._logger = logger
self._server: Optional[SMTP] = None
self.create()
def create(self):
r"""Creates connection"""
self._logger.trace(f"Started {__name__}.create")
self.connect()
self._logger.trace(f"Stopped {__name__}.create")
def connect(self):
self._logger.trace(f"Started {__name__}.connect")
try:
self._logger.debug(f"Try to connect to {self._mail_settings.host}:{self._mail_settings.port}")
self._server = SMTP(self._mail_settings.host, self._mail_settings.port)
self._logger.info(f"Connected to {self._mail_settings.host}:{self._mail_settings.port}")
self._logger.debug("Try to start tls")
self._server.starttls(context=ssl.create_default_context())
self._logger.info("Started tls")
except Exception as e:
self._logger.error("Cannot connect to mail server", e)
self._logger.trace(f"Stopped {__name__}.connect")
def login(self):
r"""Login to server"""
self._logger.trace(f"Started {__name__}.login")
try:
self._logger.debug(
__name__,
f"Try to login {self._mail_settings.user_name}@{self._mail_settings.host}:{self._mail_settings.port}",
)
self._server.login(
self._mail_settings.user_name, CredentialManager.decrypt(self._mail_settings.credentials)
)
self._logger.info(
__name__,
f"Logged on as {self._mail_settings.user_name} to {self._mail_settings.host}:{self._mail_settings.port}",
)
except Exception as e:
self._logger.error("Cannot login to mail server", e)
self._logger.trace(f"Stopped {__name__}.login")
def send_mail(self, email: EMail):
self._logger.trace(f"Started {__name__}.send_mail")
try:
self.login()
self._logger.debug(f"Try to send email to {email.receiver_list}")
self._server.sendmail(
self._mail_settings.user_name, email.receiver_list, email.get_content(self._mail_settings.user_name)
)
self._logger.info(f"Sent email to {email.receiver_list}")
except Exception as e:
self._logger.error(f"Cannot send mail to {email.receiver_list}", e)
self._logger.trace(f"Stopped {__name__}.send_mail")

View File

@@ -1,8 +0,0 @@
from cpl.core.log.logger import Logger
from cpl.core.typing import Source
class MailLogger(Logger):
def __init__(self, source: Source):
Logger.__init__(self, source, "mail")

View File

@@ -1,29 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-mail"
version = "2024.7.0"
description = "CPL mail"
readme = "CPL mail package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "mail", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1 +0,0 @@
cpl-core

View File

@@ -1 +0,0 @@

View File

@@ -1,5 +0,0 @@
from .default_lambda import default_lambda
from .ordered_queryable import OrderedQueryable
from .sequence import Sequence
from .ordered_queryable_abc import OrderedQueryableABC
from .queryable_abc import QueryableABC

View File

@@ -1,2 +0,0 @@
from .enumerable import Enumerable
from .enumerable_abc import EnumerableABC

View File

@@ -1 +0,0 @@
from .list import List

View File

@@ -1,36 +0,0 @@
from cpl.query.enumerable.enumerable_abc import EnumerableABC
from cpl.query.iterable.iterable import Iterable
class List(Iterable):
r"""Implementation of :class: `cpl.query.extension.iterable.Iterable`"""
def __init__(self, t: type = None, values: Iterable = None):
Iterable.__init__(self, t, values)
def __getitem__(self, *args):
return self._values.__getitem__(*args)
def __setitem__(self, *args):
self._values.__setitem__(*args)
def __delitem__(self, *args):
self._values.__delitem__(*args)
def to_enumerable(self) -> EnumerableABC:
r"""Converts :class: `cpl.query.iterable.iterable_abc.IterableABC` to :class: `cpl.query.enumerable.enumerable_abc.EnumerableABC`
Returns:
:class: `cpl.query.enumerable.enumerable_abc.EnumerableABC`
"""
from cpl.query.enumerable.enumerable import Enumerable
return Enumerable(self._type, self.to_list())
def to_iterable(self) -> Iterable:
r"""Converts :class: `cpl.query.enumerable.enumerable_abc.EnumerableABC` to :class: `cpl.query.iterable.iterable_abc.IterableABC`
Returns:
:class: `cpl.query.iterable.iterable_abc.IterableABC`
"""
return Iterable(self._type, self.to_list())

View File

@@ -1,2 +0,0 @@
from .iterable_abc import IterableABC
from .iterable import Iterable

View File

@@ -1,30 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-query"
version = "2024.7.0"
description = "CPL query"
readme ="CPL query package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "query", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1,22 +0,0 @@
from cpl.dependency import ServiceCollection as _ServiceCollection
from .translate_pipe import TranslatePipe
from .translation_service import TranslationService
from .translation_service_abc import TranslationServiceABC
from .translation_settings import TranslationSettings
def add_translation(collection: _ServiceCollection):
from cpl.core.console import Console
from cpl.core.pipes import PipeABC
from cpl.translation.translate_pipe import TranslatePipe
from cpl.translation.translation_service import TranslationService
from cpl.translation.translation_service_abc import TranslationServiceABC
try:
collection.add_singleton(TranslationServiceABC, TranslationService)
collection.add_transient(PipeABC, TranslatePipe)
except ImportError as e:
Console.error("cpl-translation is not installed", str(e))
_ServiceCollection.with_module(add_translation, __name__)

View File

@@ -1,30 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-translation"
version = "2024.7.0"
description = "CPL translation"
readme = "CPL translation package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "translation", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1 +0,0 @@
cpl-core

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
cpl-cli CPL CLI
~~~~~~~~~~~~~~~~~~~
CPL Command Line Interface
:copyright: (c) 2020 - 2024 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "cpl_cli"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2020 - 2024 sh-edraft.de"
__version__ = "2024.10.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="2024", minor="10", micro="0")

View File

@@ -0,0 +1,69 @@
from cpl_cli.abc.project_type_abc import ProjectTypeABC
from cpl_cli.configuration import WorkspaceSettings
from cpl_core.utils import String
class Console(ProjectTypeABC):
def __init__(
self,
base_path: str,
project_name: str,
workspace: WorkspaceSettings,
use_application_api: bool,
use_startup: bool,
use_service_providing: bool,
use_async: bool,
project_file_data: dict,
):
from project_file import ProjectFile
from project_file_appsettings import ProjectFileAppsettings
from project_file_code_application import ProjectFileApplication
from project_file_code_main import ProjectFileMain
from project_file_code_startup import ProjectFileStartup
from project_file_readme import ProjectFileReadme
from project_file_license import ProjectFileLicense
from schematic_init import Init
ProjectTypeABC.__init__(
self,
base_path,
project_name,
workspace,
use_application_api,
use_startup,
use_service_providing,
use_async,
project_file_data,
)
project_path = f'{base_path}{String.convert_to_snake_case(project_name.split("/")[-1])}/'
self.add_template(ProjectFile(project_name.split("/")[-1], project_path, project_file_data))
if workspace is None:
self.add_template(ProjectFileLicense(""))
self.add_template(ProjectFileReadme(""))
self.add_template(Init("", "init", f"{base_path}tests/"))
self.add_template(Init("", "init", project_path))
self.add_template(ProjectFileAppsettings(project_path))
if use_application_api:
self.add_template(
ProjectFileApplication(project_path, use_application_api, use_startup, use_service_providing, use_async)
)
if use_startup:
self.add_template(
ProjectFileStartup(project_path, use_application_api, use_startup, use_service_providing, use_async)
)
self.add_template(
ProjectFileMain(
project_name.split("/")[-1],
project_path,
use_application_api,
use_startup,
use_service_providing,
use_async,
)
)

View File

@@ -0,0 +1,13 @@
import json
from cpl_cli.abc.file_template_abc import FileTemplateABC
class ProjectFile(FileTemplateABC):
def __init__(self, name: str, path: str, code: dict):
FileTemplateABC.__init__(self, "", path, "{}")
self._name = f"{name}.json"
self._code = code
def get_code(self) -> str:
return json.dumps(self._code, indent=2)

View File

@@ -0,0 +1,29 @@
import textwrap
from cpl_cli.abc.file_template_abc import FileTemplateABC
class ProjectFileAppsettings(FileTemplateABC):
def __init__(self, path: str):
FileTemplateABC.__init__(self, "", path, "{}")
self._name = "appsettings.json"
def get_code(self) -> str:
return textwrap.dedent(
"""\
{
"TimeFormatSettings": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"LoggingSettings": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLogLevel": "ERROR",
"FileLogLevel": "WARN"
}
}
"""
)

View File

@@ -0,0 +1,56 @@
from cpl_cli.abc.code_file_template_abc import CodeFileTemplateABC
class ProjectFileApplication(CodeFileTemplateABC):
def __init__(
self, path: str, use_application_api: bool, use_startup: bool, use_service_providing: bool, use_async: bool
):
CodeFileTemplateABC.__init__(
self, "application", path, "", use_application_api, use_startup, use_service_providing, use_async
)
def get_code(self) -> str:
import textwrap
if self._use_async:
return textwrap.dedent(
"""\
from cpl_core.application import ApplicationABC
from cpl_core.configuration import ConfigurationABC
from cpl_core.console import Console
from cpl_core.dependency_injection import ServiceProviderABC
class Application(ApplicationABC):
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
ApplicationABC.__init__(self, config, services)
async def configure(self):
pass
async def main(self):
Console.write_line('Hello World')
"""
)
return textwrap.dedent(
"""\
from cpl_core.application import ApplicationABC
from cpl_core.configuration import ConfigurationABC
from cpl_core.console import Console
from cpl_core.dependency_injection import ServiceProviderABC
class Application(ApplicationABC):
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
ApplicationABC.__init__(self, config, services)
def configure(self):
pass
def main(self):
Console.write_line('Hello World')
"""
)

View File

@@ -0,0 +1,107 @@
from cpl_cli.abc.code_file_template_abc import CodeFileTemplateABC
from cpl_core.utils import String
class ProjectFileMain(CodeFileTemplateABC):
def __init__(
self,
name: str,
path: str,
use_application_api: bool,
use_startup: bool,
use_service_providing: bool,
use_async: bool,
):
CodeFileTemplateABC.__init__(
self, "main", path, "", use_application_api, use_startup, use_service_providing, use_async
)
import textwrap
import_pkg = f"{String.convert_to_snake_case(name)}."
self._main_with_application_host_and_startup = textwrap.dedent(
f"""\
{"import asyncio" if self._use_async else ''}
from cpl_core.application import ApplicationBuilder
from {import_pkg}application import Application
from {import_pkg}startup import Startup
{self._async()}def main():
app_builder = ApplicationBuilder(Application)
app_builder.use_startup(Startup)
{"app: Application = await app_builder.build_async()" if self._use_async else ""}
{"await app.run_async()" if self._use_async else "app_builder.build().run()"}
if __name__ == '__main__':
{"asyncio.run(main())" if self._use_async else "main()"}
"""
)
self._main_with_application_base = textwrap.dedent(
f"""\
{"import asyncio" if self._use_async else ''}
from cpl_core.application import ApplicationBuilder
from {import_pkg}application import Application
{self._async()}def main():
app_builder = ApplicationBuilder(Application)
{"app: Application = await app_builder.build_async()" if self._use_async else ""}
{"await app.run_async()" if self._use_async else "app_builder.build().run()"}
if __name__ == '__main__':
{"asyncio.run(main())" if self._use_async else "main()"}
"""
)
self._main_with_dependency_injection = textwrap.dedent(
f"""\
{"import asyncio" if self._use_async else ''}
from cpl_core.application import ApplicationBuilder
{self._async()}def configure_configuration() -> ConfigurationABC:
config = Configuration()
return config
{self._async()}def configure_services(config: ConfigurationABC) -> ServiceProviderABC:
services = ServiceCollection(config)
return services.build_service_provider()
{self._async()}def main():
config = {self._async()}configure_configuration()
provider = {self._async()}configure_services(config)
Console.write_line('Hello World')
if __name__ == '__main__':
{"asyncio.run(main())" if self._use_async else "main()"}
"""
)
def _async(self) -> str:
if self._use_async:
return "async "
return ""
def get_code(self) -> str:
if self._use_application_api and self._use_startup:
return self._main_with_application_host_and_startup
if self._use_application_api:
return self._main_with_application_base
if self._use_service_providing:
return self._main_with_dependency_injection
return self._main_with_application_base

View File

@@ -0,0 +1,34 @@
from cpl_cli.abc.code_file_template_abc import CodeFileTemplateABC
class ProjectFileStartup(CodeFileTemplateABC):
def __init__(
self, path: str, use_application_api: bool, use_startup: bool, use_service_providing: bool, use_async: bool
):
CodeFileTemplateABC.__init__(
self, "startup", path, "", use_application_api, use_startup, use_service_providing, use_async
)
def get_code(self) -> str:
import textwrap
return textwrap.dedent(
"""\
from cpl_core.application import StartupABC
from cpl_core.configuration import ConfigurationABC
from cpl_core.dependency_injection import ServiceProviderABC, ServiceCollectionABC
from cpl_core.environment import ApplicationEnvironment
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
def configure_configuration(self, configuration: ConfigurationABC, environment: ApplicationEnvironment) -> ConfigurationABC:
return configuration
def configure_services(self, services: ServiceCollectionABC, environment: ApplicationEnvironment) -> ServiceProviderABC:
return services.build_service_provider()
"""
)

View File

@@ -0,0 +1,66 @@
from cpl_cli.abc.code_file_template_abc import CodeFileTemplateABC
class ProjectFileTestApplication(CodeFileTemplateABC):
def __init__(
self, path: str, use_application_api: bool, use_startup: bool, use_service_providing: bool, use_async: bool
):
CodeFileTemplateABC.__init__(
self, "application", path, "", use_application_api, use_startup, use_service_providing, use_async
)
def get_code(self) -> str:
import textwrap
if self._use_async:
return textwrap.dedent(
"""\
import unittest
from unittest import TestSuite
from cpl_core.application import ApplicationABC
from cpl_core.configuration import ConfigurationABC
from cpl_core.dependency_injection import ServiceProviderABC
from unittests.test_case import TestCase
class Application(ApplicationABC):
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
ApplicationABC.__init__(self, config, services)
self._suite: TestSuite = unittest.TestSuite()
async def configure(self):
self._suite.addTest(TestCase('test_equal'))
async def main(self):
runner = unittest.TextTestRunner()
runner.run(self._suite)
"""
)
return textwrap.dedent(
"""\
import unittest
from unittest import TestSuite
from cpl_core.application import ApplicationABC
from cpl_core.configuration import ConfigurationABC
from cpl_core.dependency_injection import ServiceProviderABC
from unittests.test_case import TestCase
class Application(ApplicationABC):
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
ApplicationABC.__init__(self, config, services)
self._suite: TestSuite = unittest.TestSuite()
def configure(self):
self._suite.addTest(TestCase('test_equal'))
def main(self):
runner = unittest.TextTestRunner()
runner.run(self._suite)
"""
)

Some files were not shown because too many files have changed in this diff Show More