Compare commits

..

16 Commits

Author SHA1 Message Date
4625b626e6 Added dao base
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 16s
Build on push / core (push) Successful in 23s
Build on push / translation (push) Successful in 14s
Build on push / mail (push) Successful in 14s
2025-09-17 00:12:25 +02:00
58dbd3ed1e Cleanup for mysql
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 18s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 17s
2025-09-16 20:21:33 +02:00
cd7dfaf2b4 Fixed spinner thread & remove log thread
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 19s
Build on push / translation (push) Successful in 18s
Build on push / mail (push) Successful in 18s
2025-09-16 18:41:57 +02:00
8ad3e3bdb4 Removed ConfigModel from_dict
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 16s
Build on push / query (push) Successful in 17s
Build on push / mail (push) Successful in 17s
Build on push / translation (push) Successful in 18s
2025-09-16 08:51:56 +02:00
b97bc0a3ed Restructuring
All checks were successful
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 17s
Build on push / core (push) Successful in 26s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 17s
2025-09-16 08:48:07 +02:00
5f25400bcd Updated config & environment 2025-09-16 08:39:00 +02:00
5edabbf8c1 Removed docs from sources
All checks were successful
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 17s
Build on push / query (push) Successful in 18s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 16s
2025-09-15 23:08:17 +02:00
62b6435470 Improved logging closes #180
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Successful in 17s
Build on push / core (push) Successful in 18s
Build on push / mail (push) Successful in 17s
Build on push / translation (push) Successful in 18s
2025-09-15 23:06:38 +02:00
d9d7802f33 Fixed translation build
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Successful in 19s
Build on push / core (push) Successful in 20s
Build on push / mail (push) Successful in 15s
Build on push / translation (push) Successful in 18s
2025-09-15 21:08:40 +02:00
25b4ca0696 Added cpl-mail
Some checks failed
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 25s
Build on push / translation (push) Failing after 8s
Build on push / mail (push) Successful in 14s
2025-09-15 20:56:07 +02:00
3b120370b8 Added cpl-mail
All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / translation (push) Successful in 20s
Build on push / mail (push) Successful in 21s
Build on push / query (push) Successful in 21s
Build on push / core (push) Successful in 21s
2025-09-15 18:38:34 +02:00
aac038ef63 Added cpl-mail
Some checks failed
Build on push / core (push) Failing after 9s
Build on push / mail (push) Has been cancelled
Build on push / translation (push) Has been cancelled
Build on push / prepare (push) Has been cancelled
Build on push / query (push) Has been cancelled
2025-09-15 18:35:36 +02:00
784632a0b4 Updated ci
Some checks failed
Build on push / core (push) Has been cancelled
Build on push / query (push) Has been cancelled
Build on push / translation (push) Has been cancelled
Build on push / prepare (push) Has been cancelled
2025-09-15 18:17:23 +02:00
4719c32457 Updated ci
Some checks failed
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Failing after 18s
Build on push / core (push) Successful in 15s
2025-09-15 16:55:57 +02:00
516fa3fb7e Updated ci
Some checks failed
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 18s
Build on push / core (push) Failing after 18s
2025-09-15 15:28:09 +02:00
2d2bb86720 First update towards cpl2
Some checks failed
Build on push / prepare (push) Failing after 6s
Build on push / build (push) Has been skipped
2025-09-15 15:02:47 +02:00
549 changed files with 4130 additions and 13027 deletions

View File

@@ -0,0 +1,41 @@
name: Build on push
run-name: Build on push
on:
push:
branches:
- dev
jobs:
prepare:
uses: ./.gitea/workflows/prepare.yaml
with:
version_suffix: 'dev'
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-core
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-translation
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-mail
secrets: inherit

View File

@@ -0,0 +1,39 @@
name: Build on push
run-name: Build on push
on:
push:
branches:
- master
jobs:
prepare:
uses: ./.gitea/workflows/prepare.yaml
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-core
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-translation
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-mail
secrets: inherit

View File

@@ -0,0 +1,65 @@
name: Build Package
run-name: Build Python Package
on:
workflow_call:
inputs:
version_suffix:
description: 'Suffix for version (z.B. "dev", "alpha", "beta")'
required: false
type: string
working_directory:
required: true
type: string
jobs:
build:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
defaults:
run:
working-directory: ${{ inputs.working_directory }}
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Download build version artifact
uses: actions/download-artifact@v3
with:
name: version
- name: Set version
run: |
sed -i -E "s/^version = \".*\"/version = \"$(cat /workspace/sh-edraft.de/cpl/version.txt)\"/" pyproject.toml
echo "Set version to $(cat /workspace/sh-edraft.de/cpl/version.txt)"
cat pyproject.toml
- name: Set pip conf
run: |
cat > .pip.conf <<'EOF'
[global]
extra-index-url = https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/
EOF
- name: Install Dependencies
run: |
export PIP_CONFIG_FILE=".pip.conf"
pip install build
- name: Build Package
run: |
python -m build --outdir dist
- name: Login to registry git.sh-edraft.de
uses: https://github.com/docker/login-action@v1
with:
registry: git.sh-edraft.de
username: ${{ secrets.CI_USERNAME }}
password: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Push image
run: |
pip install twine
python -m twine upload --repository-url https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi -u ${{ secrets.CI_USERNAME }} -p ${{ secrets.CI_ACCESS_TOKEN }} ./dist/*

View File

@@ -0,0 +1,54 @@
name: Prepare Build
run-name: Prepare Build Version
on:
workflow_call:
inputs:
version_suffix:
description: 'Suffix for version (z.B. "dev", "alpha", "beta")'
required: false
type: string
jobs:
prepare:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Get Date and Build Number
run: |
git fetch --tags
git tag
DATE=$(date +'%Y.%m.%d')
TAG_COUNT=$(git tag -l "${DATE}.*" | wc -l)
BUILD_NUMBER=$(($TAG_COUNT + 1))
VERSION_SUFFIX=${{ inputs.version_suffix }}
if [ -n "$VERSION_SUFFIX" ] && [ "$VERSION_SUFFIX" = "dev" ]; then
BUILD_VERSION="${DATE}.dev${BUILD_NUMBER}"
elif [ -n "$VERSION_SUFFIX" ]; then
BUILD_VERSION="${DATE}.${BUILD_NUMBER}${VERSION_SUFFIX}"
else
BUILD_VERSION="${DATE}.${BUILD_NUMBER}"
fi
echo "$BUILD_VERSION" > version.txt
echo "VERSION $BUILD_VERSION"
- name: Create Git Tag for Build
run: |
git config user.name "ci"
git config user.email "dev@sh-edraft.de"
echo "tag $(cat version.txt)"
git tag $(cat version.txt)
git push origin --tags
- name: Upload build version artifact
uses: actions/upload-artifact@v3
with:
name: version
path: version.txt

2
.pip.conf Normal file
View File

@@ -0,0 +1,2 @@
[global]
extra-index-url = https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/

153
README.md
View File

@@ -1,153 +0,0 @@
<h1 align="center">CPL - Common python library</h1>
<!-- Summary -->
<p align="center">
<!-- <img src="" alt="cpl-logo" width="120px" height="120px"/> -->
<br>
<i>
CPL is a development platform for python server applications
<br>using Python.</i>
<br>
</p>
## Table of Contents
<!-- TABLE OF CONTENTS -->
<ol>
<li><a href="#Features">Features</a></li>
<li>
<a href="#getting-started">Getting Started</a>
<ul>
<li><a href="#prerequisites">Prerequisites</a></li>
<li><a href="#installation">Installation</a></li>
</ul>
</li>
<li><a href="#roadmap">Roadmap</a></li>
<li><a href="#contributing">Contributing</a></li>
<li><a href="#license">License</a></li>
<li><a href="#contact">Contact</a></li>
</ol>
## Features
<!-- FEATURE OVERVIEW -->
- Expandle
- Application base
- Standardized application classes
- Application object builder
- Application extension classes
- Startup classes
- Startup extension classes
- Configuration
- Configure via object mapped JSON
- Console argument handling
- Console class for in and output
- Banner
- Spinner
- Options (menu)
- Table
- Write
- Write_at
- Write_line
- Write_line_at
- Dependency injection
- Service lifetimes: singleton, scoped and transient
- Providing of application environment
- Environment (development, staging, testing, production)
- Appname
- Customer
- Hostname
- Runtime directory
- Working directory
- Logging
- Standardized logger
- Log-level (FATAL, ERROR, WARN, INFO, DEBUG & TRACE)
- Mail handling
- Send mails
- Pipe classes
- Convert input
- Utils
- Credential manager
- Encryption via BASE64
- PIP wrapper class based on subprocess
- Run pip commands
- String converter to different variants
- to_lower_case
- to_camel_case
- ...
<!-- GETTING STARTED -->
## Getting Started
[Get started with CPL][quickstart].
### Prerequisites
- Install [python] which includes [Pip installs packages][pip]
### Installation
Install the CPL package
```sh
pip install cpl-core --extra-index-url https://pip.sh-edraft.de
```
Install the CPL CLI
```sh
pip install cpl-cli --extra-index-url https://pip.sh-edraft.de
```
Create workspace:
```sh
cpl new <console|library|unittest> <PROJECT NAME>
```
Run the application:
```sh
cd <PROJECT NAME>
cpl start
```
<!-- ROADMAP -->
## Roadmap
See the [open issues](https://git.sh-edraft.de/sh-edraft.de/sh_cpl/issues) for a list of proposed features (and known issues).
<!-- CONTRIBUTING -->
## Contributing
### Contributing Guidelines
Read through our [contributing guidelines][contributing] to learn about our submission process, coding rules and more.
### Want to Help?
Want to file a bug, contribute some code, or improve documentation? Excellent! Read up on our guidelines for [contributing][contributing].
<!-- LICENSE -->
## License
Distributed under the MIT License. See [LICENSE] for more information.
<!-- CONTACT -->
## Contact
Sven Heidemann - sven.heidemann@sh-edraft.de
Project link: [https://git.sh-edraft.de/sh-edraft.de/sh_common_py_lib](https://git.sh-edraft.de/sh-edraft.de/sh_cpl)
<!-- External LINKS -->
[pip_url]: https://pip.sh-edraft.de
[python]: https://www.python.org/
[pip]: https://pypi.org/project/pip/
<!-- Internal LINKS -->
[project]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl
[quickstart]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/quickstart
[contributing]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/contributing
[license]: LICENSE

View File

@@ -1,151 +0,0 @@
{
"WorkspaceSettings": {
"DefaultProject": "cpl-core",
"Projects": {
"cpl-cli": "src/cpl_cli/cpl-cli.json",
"cpl-core": "src/cpl_core/cpl-core.json",
"cpl-discord": "src/cpl_discord/cpl-discord.json",
"cpl-query": "src/cpl_query/cpl-query.json",
"cpl-translation": "src/cpl_translation/cpl-translation.json",
"set-version": "tools/set_version/set-version.json",
"set-pip-urls": "tools/set_pip_urls/set-pip-urls.json",
"unittests": "unittests/unittests/unittests.json",
"unittests_cli": "unittests/unittests_cli/unittests_cli.json",
"unittests_core": "unittests/unittests_core/unittests_core.json",
"unittests_query": "unittests/unittests_query/unittests_query.json",
"unittests_shared": "unittests/unittests_shared/unittests_shared.json",
"unittests_translation": "unittests/unittests_translation/unittests_translation.json"
},
"Scripts": {
"hello-world": "echo 'Hello World'",
"format": "echo 'Formatting:'; black ./",
"sv": "cpl set-version",
"set-version": "cpl run set-version --dev $ARGS; echo '';",
"spu": "cpl set-pip-urls",
"set-pip-urls": "cpl run set-pip-urls --dev $ARGS; echo '';",
"docs-build": "cpl format; echo 'Build Documentation'; cpl db-core; cpl db-discord; cpl db-query; cpl db-translation; cd docs/; make clean; make html;",
"db-core": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_core; cd ../",
"db-discord": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_discord; cd ../",
"db-query": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_query; cd ../",
"db-translation": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_translation; cd ../",
"db": "cpl docs-build",
"docs-open": "xdg-open $PWD/docs/build/html/index.html &",
"do": "cpl docs-open",
"test": "cpl run unittests",
"pre-build-all": "cpl sv $ARGS; cpl spu $ARGS;",
"build-all": "cpl build-cli; cpl build-core; cpl build-discord; cpl build-query; cpl build-translation; cpl build-set-pip-urls; cpl build-set-version",
"ba": "cpl build-all $ARGS",
"build-cli": "echo 'Build cpl-cli'; cd ./src/cpl_cli; cpl build; cd ../../;",
"build-core": "echo 'Build cpl-core'; cd ./src/cpl_core; cpl build; cd ../../;",
"build-discord": "echo 'Build cpl-discord'; cd ./src/cpl_discord; cpl build; cd ../../;",
"build-query": "echo 'Build cpl-query'; cd ./src/cpl_query; cpl build; cd ../../;",
"build-translation": "echo 'Build cpl-translation'; cd ./src/cpl_translation; cpl build; cd ../../;",
"build-set-pip-urls": "echo 'Build set-pip-urls'; cd ./tools/set_pip_urls; cpl build; cd ../../;",
"build-set-version": "echo 'Build set-version'; cd ./tools/set_version; cpl build; cd ../../;",
"pre-publish-all": "cpl sv $ARGS; cpl spu $ARGS;",
"publish-all": "cpl publish-cli; cpl publish-core; cpl publish-discord; cpl publish-query; cpl publish-translation;",
"pa": "cpl publish-all $ARGS",
"publish-cli": "echo 'Publish cpl-cli'; cd ./src/cpl_cli; cpl publish; cd ../../;",
"publish-core": "echo 'Publish cpl-core'; cd ./src/cpl_core; cpl publish; cd ../../;",
"publish-discord": "echo 'Publish cpl-discord'; cd ./src/cpl_discord; cpl publish; cd ../../;",
"publish-query": "echo 'Publish cpl-query'; cd ./src/cpl_query; cpl publish; cd ../../;",
"publish-translation": "echo 'Publish cpl-translation'; cd ./src/cpl_translation; cpl publish; cd ../../;",
"upload-prod-cli": "echo 'PROD Upload cpl-cli'; cpl upl-prod-cli;",
"upl-prod-cli": "twine upload -r pip.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-prod-core": "echo 'PROD Upload cpl-core'; cpl upl-prod-core;",
"upl-prod-core": "twine upload -r pip.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-prod-discord": "echo 'PROD Upload cpl-discord'; cpl upl-prod-discord;",
"upl-prod-discord": "twine upload -r pip.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-prod-query": "echo 'PROD Upload cpl-query'; cpl upl-prod-query;",
"upl-prod-query": "twine upload -r pip.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-prod-translation": "echo 'PROD Upload cpl-translation'; cpl upl-prod-translation;",
"upl-prod-translation": "twine upload -r pip.sh-edraft.de dist/cpl-translation/publish/setup/*",
"upload-exp-cli": "echo 'EXP Upload cpl-cli'; cpl upl-exp-cli;",
"upl-exp-cli": "twine upload -r pip-exp.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-exp-core": "echo 'EXP Upload cpl-core'; cpl upl-exp-core;",
"upl-exp-core": "twine upload -r pip-exp.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-exp-discord": "echo 'EXP Upload cpl-discord'; cpl upl-exp-discord;",
"upl-exp-discord": "twine upload -r pip-exp.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-exp-query": "echo 'EXP Upload cpl-query'; cpl upl-exp-query;",
"upl-exp-query": "twine upload -r pip-exp.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-exp-translation": "echo 'EXP Upload cpl-translation'; cpl upl-exp-translation;",
"upl-exp-translation": "twine upload -r pip-exp.sh-edraft.de dist/cpl-translation/publish/setup/*",
"upload-dev-cli": "echo 'DEV Upload cpl-cli'; cpl upl-dev-cli;",
"upl-dev-cli": "twine upload -r pip-dev.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-dev-core": "echo 'DEV Upload cpl-core'; cpl upl-dev-core;",
"upl-dev-core": "twine upload -r pip-dev.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-dev-discord": "echo 'DEV Upload cpl-discord'; cpl upl-dev-discord;",
"upl-dev-discord": "twine upload -r pip-dev.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-dev-query": "echo 'DEV Upload cpl-query'; cpl upl-dev-query;",
"upl-dev-query": "twine upload -r pip-dev.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-dev-translation": "echo 'DEV Upload cpl-translation'; cpl upl-dev-translation;",
"upl-dev-translation": "twine upload -r pip-dev.sh-edraft.de dist/cpl-translation/publish/setup/*",
"pre-deploy-prod": "cpl sv $ARGS; cpl spu --environment=production;",
"deploy-prod": "cpl deploy-prod-cli; cpl deploy-prod-core; cpl deploy-prod-discord; cpl deploy-prod-query; cpl deploy-prod-translation;",
"dp": "cpl deploy-prod $ARGS",
"deploy-prod-cli": "cpl publish-cli; cpl upload-prod-cli",
"deploy-prod-core": "cpl publish-core; cpl upload-prod-core",
"deploy-prod-query": "cpl publish-query; cpl upload-prod-query",
"deploy-prod-discord": "cpl publish-discord; cpl upload-prod-discord",
"deploy-prod-translation": "cpl publish-translation; cpl upload-prod-translation",
"pre-deploy-exp": "cpl sv $ARGS; cpl spu --environment=staging;",
"deploy-exp": "cpl deploy-exp-cli; cpl deploy-exp-core; cpl deploy-exp-discord; cpl deploy-exp-query; cpl deploy-exp-translation;",
"de": "cpl deploy-exp $ARGS",
"deploy-exp-cli": "cpl publish-cli; cpl upload-exp-cli",
"deploy-exp-core": "cpl publish-core; cpl upload-exp-core",
"deploy-exp-discord": "cpl publish-discord; cpl upload-exp-discord",
"deploy-exp-query": "cpl publish-query; cpl upload-exp-query",
"deploy-exp-translation": "cpl publish-translation; cpl upload-exp-translation",
"pre-deploy-dev": "cpl sv $ARGS; cpl spu --environment=development;",
"deploy-dev": "cpl deploy-dev-cli; cpl deploy-dev-core; cpl deploy-dev-discord; cpl deploy-dev-query; cpl deploy-dev-translation;",
"dd": "cpl deploy-dev $ARGS",
"deploy-dev-cli": "cpl publish-cli; cpl upload-dev-cli",
"deploy-dev-core": "cpl publish-core; cpl upload-dev-core",
"deploy-dev-discord": "cpl publish-discord; cpl upload-dev-discord",
"deploy-dev-query": "cpl publish-query; cpl upload-dev-query",
"deploy-dev-translation": "cpl publish-query; cpl upload-dev-translation",
"dev-install": "cpl di-core; cpl di-cli; cpl di-query; cpl di-translation;",
"di": "cpl dev-install",
"di-core": "pip install cpl-core --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-cli": "pip install cpl-cli --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-discord": "pip install cpl-discord --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-query": "pip install cpl-query --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-translation": "pip install cpl-translation --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"prod-install": "cpl pi-core; cpl pi-cli; cpl pi-query; cpl pi-translation;",
"pi": "cpl prod-install",
"pi-core": "pip install cpl-core --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-cli": "pip install cpl-cli --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-discord": "pip install cpl-discord --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-query": "pip install cpl-query --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-translation": "pip install cpl-translation --pre --upgrade --extra-index-url https://pip.sh-edraft.de"
}
}
}

View File

@@ -0,0 +1,6 @@
from .application_abc import ApplicationABC
from .application_builder import ApplicationBuilder
from .application_builder_abc import ApplicationBuilderABC
from .application_extension_abc import ApplicationExtensionABC
from .startup_abc import StartupABC
from .startup_extension_abc import StartupExtensionABC

View File

@@ -1,26 +1,23 @@
from abc import ABC, abstractmethod
from typing import Optional
from cpl_core.configuration.configuration_abc import ConfigurationABC
from cpl_core.console.console import Console
from cpl_core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl_core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
from cpl.core.console.console import Console
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
config: :class:`cpl_core.configuration.configuration_abc.ConfigurationABC`
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
Contains object loaded from appsettings
services: :class:`cpl_core.dependency_injection.service_provider_abc.ServiceProviderABC`
services: :class:`cpl.dependency.service_provider_abc.ServiceProviderABC`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
self._configuration: Optional[ConfigurationABC] = config
self._environment: Optional[ApplicationEnvironmentABC] = self._configuration.environment
def __init__(self, services: ServiceProviderABC):
self._services: Optional[ServiceProviderABC] = services
def run(self):
@@ -49,14 +46,12 @@ class ApplicationABC(ABC):
def configure(self):
r"""Configure the application
Called by :class:`cpl_core.application.application_abc.ApplicationABC.run`
Called by :class:`cpl.application.application_abc.ApplicationABC.run`
"""
pass
@abstractmethod
def main(self):
r"""Custom entry point
Called by :class:`cpl_core.application.application_abc.ApplicationABC.run`
Called by :class:`cpl.application.application_abc.ApplicationABC.run`
"""
pass

View File

@@ -0,0 +1,97 @@
from typing import Type, Optional, Callable, Union
from cpl.application.application_abc import ApplicationABC
from cpl.application.application_builder_abc import ApplicationBuilderABC
from cpl.application.application_extension_abc import ApplicationExtensionABC
from cpl.application.async_application_extension_abc import AsyncApplicationExtensionABC
from cpl.application.async_startup_abc import AsyncStartupABC
from cpl.application.async_startup_extension_abc import AsyncStartupExtensionABC
from cpl.application.startup_abc import StartupABC
from cpl.application.startup_extension_abc import StartupExtensionABC
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class ApplicationBuilder(ApplicationBuilderABC):
r"""This is class is used to build an object of :class:`cpl.application.application_abc.ApplicationABC`
Parameter:
app: Type[:class:`cpl.application.application_abc.ApplicationABC`]
Application to build
"""
def __init__(self, app: Type[ApplicationABC]):
ApplicationBuilderABC.__init__(self)
self._app = app
self._startup: Optional[StartupABC | AsyncStartupABC] = None
self._services = ServiceCollection()
self._app_extensions: list[Type[ApplicationExtensionABC | AsyncApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC | AsyncStartupABC]] = []
def use_startup(self, startup: Type[StartupABC | AsyncStartupABC]) -> "ApplicationBuilder":
self._startup = startup()
return self
def use_extension(
self,
extension: Type[
ApplicationExtensionABC | AsyncApplicationExtensionABC | StartupExtensionABC | AsyncStartupExtensionABC
],
) -> "ApplicationBuilder":
if (
issubclass(extension, ApplicationExtensionABC) or issubclass(extension, AsyncApplicationExtensionABC)
) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (
issubclass(extension, StartupExtensionABC) or issubclass(extension, AsyncStartupExtensionABC)
) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def _build_startup(self):
for ex in self._startup_extensions:
extension = ex()
extension.configure_configuration(Configuration, Environment)
extension.configure_services(self._services, Environment)
if self._startup is not None:
self._startup.configure_configuration(Configuration, Environment)
self._startup.configure_services(self._services, Environment)
async def _build_async_startup(self):
for ex in self._startup_extensions:
extension = ex()
await extension.configure_configuration(Configuration, Environment)
await extension.configure_services(self._services, Environment)
if self._startup is not None:
await self._startup.configure_configuration(Configuration, Environment)
await self._startup.configure_services(self._services, Environment)
def build(self) -> ApplicationABC:
self._build_startup()
config = Configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
extension.run(config, services)
return self._app(services)
async def build_async(self) -> ApplicationABC:
await self._build_async_startup()
config = Configuration
services = self._services.build_service_provider()
for ex in self._app_extensions:
extension = ex()
await extension.run(config, services)
return self._app(services)

View File

@@ -1,12 +1,12 @@
from abc import ABC, abstractmethod
from typing import Type
from cpl_core.application.application_abc import ApplicationABC
from cpl_core.application.startup_abc import StartupABC
from cpl.application.application_abc import ApplicationABC
from cpl.application.startup_abc import StartupABC
class ApplicationBuilderABC(ABC):
r"""ABC for the :class:`cpl_core.application.application_builder.ApplicationBuilder`"""
r"""ABC for the :class:`cpl.application.application_builder.ApplicationBuilder`"""
@abstractmethod
def __init__(self, *args):
@@ -17,35 +17,31 @@ class ApplicationBuilderABC(ABC):
r"""Sets the custom startup class to use
Parameter:
startup: Type[:class:`cpl_core.application.startup_abc.StartupABC`]
startup: Type[:class:`cpl.application.startup_abc.StartupABC`]
Startup class to use
"""
pass
@abstractmethod
async def use_startup(self, startup: Type[StartupABC]):
r"""Sets the custom startup class to use async
Parameter:
startup: Type[:class:`cpl_core.application.startup_abc.StartupABC`]
startup: Type[:class:`cpl.application.startup_abc.StartupABC`]
Startup class to use
"""
pass
@abstractmethod
def build(self) -> ApplicationABC:
r"""Creates custom application object
Returns:
Object of :class:`cpl_core.application.application_abc.ApplicationABC`
Object of :class:`cpl.application.application_abc.ApplicationABC`
"""
pass
@abstractmethod
async def build_async(self) -> ApplicationABC:
r"""Creates custom application object async
Returns:
Object of :class:`cpl_core.application.application_abc.ApplicationABC`
Object of :class:`cpl.application.application_abc.ApplicationABC`
"""
pass

View File

@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency import ServiceProviderABC
class AsyncApplicationExtensionABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def run(self, config: Configuration, services: ServiceProviderABC):
pass

View File

@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
from cpl.dependency.service_collection import ServiceCollection
class AsyncStartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self):
r"""Creates configuration of application"""
@abstractmethod
async def configure_services(self, service: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class AsyncStartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
async def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.Configuration`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
async def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment import Environment
class StartupABC(ABC):
r"""ABC for the startup class"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
from cpl.core.configuration import Configuration
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.environment.environment import Environment
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@abstractmethod
def __init__(self):
pass
@abstractmethod
def configure_configuration(self, config: Configuration, env: Environment):
r"""Creates configuration of application
Parameter:
config: :class:`cpl.core.configuration.configuration_abc.ConfigurationABC`
env: :class:`cpl.core.environment.application_environment_abc`
"""
@abstractmethod
def configure_services(self, service: ServiceCollection, env: Environment):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
env: :class:`cpl.core.environment.application_environment_abc`
"""

View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-application"
version = "2024.7.0"
description = "CPL application"
readme ="CPL application package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "application", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,2 @@
cpl-core
cpl-dependency

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,2 @@
from .configuration import Configuration
from .configuration_model_abc import ConfigurationModelABC

View File

@@ -0,0 +1,134 @@
import inspect
import json
import os
import sys
from typing import Any
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.console.console import Console
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.environment.environment import Environment
from cpl.core.typing import D, T
from cpl.core.utils.json_processor import JSONProcessor
class Configuration:
_config = {}
@staticmethod
def _print_info(message: str):
r"""Prints an info message
Parameter:
name: :class:`str`
Info name
message: :class:`str`
Info message
"""
Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
def _print_warn(message: str):
r"""Prints a warning
Parameter:
name: :class:`str`
Warning name
message: :class:`str`
Warning message
"""
Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
def _print_error(message: str):
r"""Prints an error
Parameter:
name: :class:`str`
Error name
message: :class:`str`
Error message
"""
Console.set_foreground_color(ForegroundColorEnum.red)
Console.write_line(f"[CONFIG] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@classmethod
def _load_json_file(cls, file: str, output: bool) -> dict:
r"""Reads the json file
Parameter:
file: :class:`str`
Name of the file
output: :class:`bool`
Specifies whether an output should take place
Returns:
Object of :class:`dict`
"""
try:
# open config file, create if not exists
with open(file, encoding="utf-8") as cfg:
# load json
json_cfg = json.load(cfg)
if output:
cls._print_info(f"Loaded config file: {file}")
return json_cfg
except Exception as e:
cls._print_error(f"Cannot load config file: {file}! -> {e}")
return {}
@classmethod
def add_json_file(cls, name: str, optional: bool = None, output: bool = True, path: str = None):
if os.path.isabs(name):
file_path = name
else:
path_root = Environment.get_cwd()
if path is not None:
path_root = path
if str(path_root).endswith("/") and not name.startswith("/"):
file_path = f"{path_root}{name}"
else:
file_path = f"{path_root}/{name}"
if not os.path.isfile(file_path):
if optional is not True:
if output:
cls._print_error(f"File not found: {file_path}")
sys.exit()
if output:
cls._print_warn(f"Not Loaded config file: {file_path}")
return None
config_from_file = cls._load_json_file(file_path, output)
for sub in ConfigurationModelABC.__subclasses__():
for key, value in config_from_file.items():
if sub.__name__ != key and sub.__name__.replace("Settings", "") != key:
continue
configuration = JSONProcessor.process(sub, value)
cls.set(sub, configuration)
@classmethod
def set(cls, key: Any, value: T):
if inspect.isclass(key):
key = key.__name__
cls._config[key] = value
@classmethod
def get(cls, key: Any, default: D = None) -> T | D:
if inspect.isclass(key):
key = key.__name__
return cls._config.get(key, default)

View File

@@ -0,0 +1,5 @@
from abc import ABC
class ConfigurationModelABC(ABC):
pass

View File

@@ -0,0 +1,4 @@
from .background_color_enum import BackgroundColorEnum
from .console import Console
from ._call import ConsoleCall
from .foreground_color_enum import ForegroundColorEnum

View File

@@ -1,28 +1,29 @@
import os
import sys
import threading
import multiprocessing
import time
from multiprocessing import Process
from termcolor import colored
from cpl_core.console.background_color_enum import BackgroundColorEnum
from cpl_core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.console.background_color_enum import BackgroundColorEnum
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
class SpinnerThread(threading.Thread):
r"""Thread to show spinner in terminal
class Spinner(Process):
r"""Process to show spinner in terminal
Parameter:
msg_len: :class:`int`
Length of the message
foreground_color: :class:`cpl_core.console.foreground_color.ForegroundColorEnum`
foreground_color: :class:`cpl.core.console.foreground_color.ForegroundColorEnum`
Foreground color of the spinner
background_color: :class:`cpl_core.console.background_color.BackgroundColorEnum`
background_color: :class:`cpl.core.console.background_color.BackgroundColorEnum`
Background color of the spinner
"""
def __init__(self, msg_len: int, foreground_color: ForegroundColorEnum, background_color: BackgroundColorEnum):
threading.Thread.__init__(self)
Process.__init__(self)
self._msg_len = msg_len
self._foreground_color = foreground_color
@@ -50,29 +51,26 @@ class SpinnerThread(threading.Thread):
return color_args
def run(self) -> None:
r"""Entry point of thread, shows the spinner"""
r"""Entry point of process, shows the spinner"""
columns = 0
if sys.platform == "win32":
columns = os.get_terminal_size().columns
else:
term_rows, term_columns = os.popen("stty size", "r").read().split()
values = os.popen("stty size", "r").read().split()
term_rows, term_columns = values if len(values) == 2 else (0, 0)
columns = int(term_columns)
end_msg = "done"
end_msg_pos = columns - self._msg_len - len(end_msg)
if end_msg_pos > 0:
print(f'{"" : >{end_msg_pos}}', end="")
padding = columns - self._msg_len - len(end_msg)
if padding > 0:
print(f'{"" : >{padding}}', end="")
else:
print("", end="")
first = True
spinner = self._spinner()
while self._is_spinning:
if first:
first = False
print(colored(f"{next(spinner): >{len(end_msg) - 1}}", *self._get_color_args()), end="")
else:
print(colored(f"{next(spinner): >{len(end_msg)}}", *self._get_color_args()), end="")
print(colored(f"{next(spinner): >{len(end_msg)}}", *self._get_color_args()), end="")
time.sleep(0.1)
back = ""
for i in range(0, len(end_msg)):
@@ -84,9 +82,10 @@ class SpinnerThread(threading.Thread):
if not self._exit:
print(colored(end_msg, *self._get_color_args()), end="")
def stop_spinning(self):
def stop(self):
r"""Stops the spinner"""
self._is_spinning = False
super().terminate()
time.sleep(0.1)
def exit(self):

View File

@@ -9,14 +9,15 @@ import colorama
from tabulate import tabulate
from termcolor import colored
from cpl_core.console.background_color_enum import BackgroundColorEnum
from cpl_core.console.console_call import ConsoleCall
from cpl_core.console.foreground_color_enum import ForegroundColorEnum
from cpl_core.console.spinner_thread import SpinnerThread
from cpl.core.console.background_color_enum import BackgroundColorEnum
from cpl.core.console._call import ConsoleCall
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
from cpl.core.console._spinner import Spinner
class Console:
r"""Useful functions for handling with input and output"""
colorama.init()
_is_first_write = True
@@ -61,7 +62,7 @@ class Console:
r"""Sets the background color
Parameter:
color: Union[:class:`cpl_core.console.background_color_enum.BackgroundColorEnum`, :class:`str`]
color: Union[:class:`cpl.core.console.background_color_enum.BackgroundColorEnum`, :class:`str`]
Background color of the console
"""
if type(color) is str:
@@ -74,7 +75,7 @@ class Console:
r"""Sets the foreground color
Parameter:
color: Union[:class:`cpl_core.console.background_color_enum.BackgroundColorEnum`, :class:`str`]
color: Union[:class:`cpl.core.console.background_color_enum.BackgroundColorEnum`, :class:`str`]
Foreground color of the console
"""
if type(color) is str:
@@ -365,17 +366,17 @@ class Console:
Message or header of the selection
options: List[:class:`str`]
Selectable options
header_foreground_color: Union[:class:`str`, :class:`cpl_core.console.foreground_color_enum.ForegroundColorEnum`]
header_foreground_color: Union[:class:`str`, :class:`cpl.core.console.foreground_color_enum.ForegroundColorEnum`]
Foreground color of the header
header_background_color: Union[:class:`str`, :class:`cpl_core.console.background_color_enum.BackgroundColorEnum`]
header_background_color: Union[:class:`str`, :class:`cpl.core.console.background_color_enum.BackgroundColorEnum`]
Background color of the header
option_foreground_color: Union[:class:`str`, :class:`cpl_core.console.foreground_color_enum.ForegroundColorEnum`]
option_foreground_color: Union[:class:`str`, :class:`cpl.core.console.foreground_color_enum.ForegroundColorEnum`]
Foreground color of the options
option_background_color: Union[:class:`str`, :class:`cpl_core.console.background_color_enum.BackgroundColorEnum`]
option_background_color: Union[:class:`str`, :class:`cpl.core.console.background_color_enum.BackgroundColorEnum`]
Background color of the options
cursor_foreground_color: Union[:class:`str`, :class:`cpl_core.console.foreground_color_enum.ForegroundColorEnum`]
cursor_foreground_color: Union[:class:`str`, :class:`cpl.core.console.foreground_color_enum.ForegroundColorEnum`]
Foreground color of the cursor
cursor_background_color: Union[:class:`str`, :class:`cpl_core.console.background_color_enum.BackgroundColorEnum`]
cursor_background_color: Union[:class:`str`, :class:`cpl.core.console.background_color_enum.BackgroundColorEnum`]
Background color of the cursor
Returns:
@@ -429,13 +430,13 @@ class Console:
Function to call
args: :class:`list`
Arguments of the function
text_foreground_color: Union[:class:`str`, :class:`cpl_core.console.foreground_color_enum.ForegroundColorEnum`]
text_foreground_color: Union[:class:`str`, :class:`cpl.core.console.foreground_color_enum.ForegroundColorEnum`]
Foreground color of the text
spinner_foreground_color: Union[:class:`str`, :class:`cpl_core.console.foreground_color_enum.ForegroundColorEnum`]
spinner_foreground_color: Union[:class:`str`, :class:`cpl.core.console.foreground_color_enum.ForegroundColorEnum`]
Foreground color of the spinner
text_background_color: Union[:class:`str`, :class:`cpl_core.console.background_color_enum.BackgroundColorEnum`]
text_background_color: Union[:class:`str`, :class:`cpl.core.console.background_color_enum.BackgroundColorEnum`]
Background color of the text
spinner_background_color: Union[:class:`str`, :class:`cpl_core.console.background_color_enum.BackgroundColorEnum`]
spinner_background_color: Union[:class:`str`, :class:`cpl.core.console.background_color_enum.BackgroundColorEnum`]
Background color of the spinner
kwargs: :class:`dict`
Keyword arguments of the call
@@ -463,7 +464,7 @@ class Console:
cls.set_hold_back(True)
spinner = None
if not cls._disabled:
spinner = SpinnerThread(len(message), spinner_foreground_color, spinner_background_color)
spinner = Spinner(len(message), spinner_foreground_color, spinner_background_color)
spinner.start()
return_value = None
@@ -475,7 +476,7 @@ class Console:
cls.close()
if spinner is not None:
spinner.stop_spinning()
spinner.stop()
cls.set_hold_back(False)
cls.set_foreground_color(ForegroundColorEnum.default)

View File

@@ -0,0 +1,2 @@
from .environment_enum import EnvironmentEnum
from .environment import Environment

View File

@@ -0,0 +1,68 @@
import os
from socket import gethostname
from typing import Optional, Type
from cpl.core.environment.environment_enum import EnvironmentEnum
from cpl.core.typing import T
from cpl.core.utils.get_value import get_value
class Environment:
r"""Represents environment of the application
Parameter:
name: :class:`cpl.core.environment.environment_name_enum.EnvironmentNameEnum`
"""
@classmethod
def get_environment(cls):
return cls.get("ENVIRONMENT", str, EnvironmentEnum.production.value)
@classmethod
def set_environment(cls, environment: str):
assert environment is not None and environment != "", "environment must not be None or empty"
assert environment.lower() in [
e.value for e in EnvironmentEnum
], f"environment must be one of {[e.value for e in EnvironmentEnum]}"
cls.set("ENVIRONMENT", environment.lower())
@classmethod
def get_app_name(cls) -> str:
return cls.get("APP_NAME", str)
@classmethod
def set_app_name(cls, app_name: str):
cls.set("APP_NAME", app_name)
@staticmethod
def get_host_name() -> str:
return gethostname()
@staticmethod
def get_cwd() -> str:
return os.getcwd()
@staticmethod
def set_cwd(working_directory: str):
assert working_directory is not None and working_directory != "", "working_directory must not be None or empty"
os.chdir(working_directory)
@staticmethod
def set(key: str, value: T):
assert key is not None and key != "", "key must not be None or empty"
os.environ[key] = str(value)
@staticmethod
def get(key: str, cast_type: Type[T], default: Optional[T] = None) -> Optional[T]:
"""
Get an environment variable and cast it to a specified type.
:param str key: The name of the environment variable.
:param Type[T] cast_type: A callable to cast the variable's value.
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
:return: The casted value, or None if the variable is not found.
:rtype: Optional[T]
"""
return get_value(dict(os.environ), key, cast_type, default)

View File

@@ -1,7 +1,7 @@
from enum import Enum
class EnvironmentNameEnum(Enum):
class EnvironmentEnum(Enum):
production = "production"
staging = "staging"
testing = "testing"

View File

@@ -0,0 +1,4 @@
from .logger import Logger
from .logger_abc import LoggerABC
from .log_level_enum import LogLevelEnum
from .logging_settings import LogSettings

View File

@@ -0,0 +1,11 @@
from enum import Enum
class LogLevelEnum(Enum):
off = "OFF" # Nothing
trace = "TRC" # Detailed app information's
debug = "DEB" # Detailed app state
info = "INF" # Normal information's
warning = "WAR" # Error that can later be fatal
error = "ERR" # Non fatal error
fatal = "FAT" # Error that cause exit

View File

@@ -0,0 +1,117 @@
import os
import traceback
from datetime import datetime
from cpl.core.console import Console
from cpl.core.log.log_level_enum import LogLevelEnum
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.typing import Messages, Source
class Logger(LoggerABC):
_level = LogLevelEnum.info
_levels = [x for x in LogLevelEnum]
# ANSI color codes for different log levels
_COLORS = {
LogLevelEnum.trace: "\033[37m", # Light Gray
LogLevelEnum.debug: "\033[94m", # Blue
LogLevelEnum.info: "\033[92m", # Green
LogLevelEnum.warning: "\033[93m", # Yellow
LogLevelEnum.error: "\033[91m", # Red
LogLevelEnum.fatal: "\033[95m", # Magenta
}
def __init__(self, source: Source, file_prefix: str = None):
LoggerABC.__init__(self)
assert source is not None and source != "", "Source cannot be None or empty"
self._source = source
if file_prefix is None:
file_prefix = "app"
self._file_prefix = file_prefix
self._create_log_dir()
@property
def log_file(self):
return f"logs/{self._file_prefix}_{datetime.now().strftime('%Y-%m-%d')}.log"
@staticmethod
def _create_log_dir():
if os.path.exists("logs"):
return
os.makedirs("logs")
@classmethod
def set_level(cls, level: LogLevelEnum):
if level in cls._levels:
cls._level = level
else:
raise ValueError(f"Invalid log level: {level}")
@staticmethod
def _ensure_file_size(log_file: str):
if not os.path.exists(log_file) or os.path.getsize(log_file) <= 0.5 * 1024 * 1024:
return
# if exists and size is greater than 300MB, create a new file
os.rename(
log_file,
f"{log_file.split('.log')[0]}_{datetime.now().strftime('%H-%M-%S')}.log",
)
def _write_log_to_file(self, content: str):
file = self.log_file
self._ensure_file_size(file)
with open(file, "a") as log_file:
log_file.write(content + "\n")
log_file.close()
def _log(self, level: LogLevelEnum, *messages: Messages):
try:
if self._levels.index(level) < self._levels.index(self._level):
return
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
formatted_message = self._format_message(level.value, timestamp, *messages)
self._write_log_to_file(formatted_message)
Console.write_line(f"{self._COLORS.get(self._level, '\033[0m')}{formatted_message}\033[0m")
except Exception as e:
print(f"Error while logging: {e} -> {traceback.format_exc()}")
def _format_message(self, level: str, timestamp, *messages: Messages) -> str:
if isinstance(messages, tuple):
messages = list(messages)
if not isinstance(messages, list):
messages = [messages]
messages = [str(message) for message in messages if message is not None]
return f"<{timestamp}> [{level.upper():^3}] [{self._file_prefix}] - [{self._source}]: {' '.join(messages)}"
def header(self, string: str):
self._log(LogLevelEnum.info, string)
def trace(self, *messages: Messages):
self._log(LogLevelEnum.trace, *messages)
def debug(self, *messages: Messages):
self._log(LogLevelEnum.debug, *messages)
def info(self, *messages: Messages):
self._log(LogLevelEnum.info, *messages)
def warning(self, *messages: Messages):
self._log(LogLevelEnum.warning, *messages)
def error(self, message, e: Exception = None):
self._log(LogLevelEnum.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
def fatal(self, message, e: Exception = None, prevent_quit: bool = False):
self._log(LogLevelEnum.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
if not prevent_quit:
exit(-1)

View File

@@ -1,12 +1,18 @@
from abc import abstractmethod, ABC
from cpl.core.typing import Messages
class LoggerABC(ABC):
r"""ABC for :class:`cpl_core.logging.logger_service.Logger`"""
r"""ABC for :class:`cpl.core.log.logger_service.Logger`"""
@abstractmethod
def __init__(self):
ABC.__init__(self)
def set_level(self, level: str):
pass
@abstractmethod
def _format_message(self, level: str, timestamp, *messages: Messages) -> str:
pass
@abstractmethod
def header(self, string: str):
@@ -16,10 +22,9 @@ class LoggerABC(ABC):
string: :class:`str`
String to write as header
"""
pass
@abstractmethod
def trace(self, name: str, message: str):
def trace(self, *messages: Messages):
r"""Writes a trace message
Parameter:
@@ -28,10 +33,9 @@ class LoggerABC(ABC):
message: :class:`str`
Message string
"""
pass
@abstractmethod
def debug(self, name: str, message: str):
def debug(self, *messages: Messages):
r"""Writes a debug message
Parameter:
@@ -40,10 +44,9 @@ class LoggerABC(ABC):
message: :class:`str`
Message string
"""
pass
@abstractmethod
def info(self, name: str, message: str):
def info(self, *messages: Messages):
r"""Writes an information
Parameter:
@@ -52,10 +55,9 @@ class LoggerABC(ABC):
message: :class:`str`
Message string
"""
pass
@abstractmethod
def warn(self, name: str, message: str):
def warning(self, *messages: Messages):
r"""Writes an warning
Parameter:
@@ -64,10 +66,9 @@ class LoggerABC(ABC):
message: :class:`str`
Message string
"""
pass
@abstractmethod
def error(self, name: str, message: str, ex: Exception = None):
def error(self, messages: str, e: Exception = None):
r"""Writes an error
Parameter:
@@ -78,10 +79,9 @@ class LoggerABC(ABC):
ex: :class:`Exception`
Thrown exception
"""
pass
@abstractmethod
def fatal(self, name: str, message: str, ex: Exception = None):
def fatal(self, messages: str, e: Exception = None):
r"""Writes an error and ends the program
Parameter:
@@ -92,4 +92,3 @@ class LoggerABC(ABC):
ex: :class:`Exception`
Thrown exception
"""
pass

View File

@@ -1,28 +1,24 @@
import traceback
from typing import Optional
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console.console import Console
from cpl_core.console.foreground_color_enum import ForegroundColorEnum
from cpl_core.logging.logging_level_enum import LoggingLevelEnum
from cpl_core.logging.logging_settings_name_enum import LoggingSettingsNameEnum
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.log.log_level_enum import LogLevelEnum
class LoggingSettings(ConfigurationModelABC):
class LogSettings(ConfigurationModelABC):
r"""Representation of logging settings"""
def __init__(
self,
path: str = None,
filename: str = None,
console_log_level: LoggingLevelEnum = None,
file_log_level: LoggingLevelEnum = None,
console_log_level: LogLevelEnum = None,
file_log_level: LogLevelEnum = None,
):
ConfigurationModelABC.__init__(self)
self._path: Optional[str] = path
self._filename: Optional[str] = filename
self._console: Optional[LoggingLevelEnum] = console_log_level
self._level: Optional[LoggingLevelEnum] = file_log_level
self._console: Optional[LogLevelEnum] = console_log_level
self._level: Optional[LogLevelEnum] = file_log_level
@property
def path(self) -> str:
@@ -41,17 +37,17 @@ class LoggingSettings(ConfigurationModelABC):
self._filename = filename
@property
def console(self) -> LoggingLevelEnum:
def console(self) -> LogLevelEnum:
return self._console
@console.setter
def console(self, console: LoggingLevelEnum) -> None:
def console(self, console: LogLevelEnum) -> None:
self._console = console
@property
def level(self) -> LoggingLevelEnum:
def level(self) -> LogLevelEnum:
return self._level
@level.setter
def level(self, level: LoggingLevelEnum) -> None:
def level(self, level: LogLevelEnum) -> None:
self._level = level

View File

@@ -0,0 +1,3 @@
from .bool_pipe import BoolPipe
from .ip_address_pipe import IPAddressPipe
from .pipe_abc import PipeABC

View File

@@ -0,0 +1,13 @@
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T
class BoolPipe[bool](PipeABC):
@staticmethod
def to_str(value: T, *args):
return str(value).lower()
@staticmethod
def from_str(value: str, *args) -> T:
return value in ("True", "true", "1", "yes", "y", "Y")

View File

@@ -0,0 +1,38 @@
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T
class IPAddressPipe[list](PipeABC):
@staticmethod
def to_str(value: T, *args) -> str:
string = ""
if len(value) != 4:
raise ValueError("Invalid IP")
for i in range(0, len(value)):
byte = value[i]
if not 0 <= byte <= 255:
raise ValueError("Invalid IP")
if i == len(value) - 1:
string += f"{byte}"
else:
string += f"{byte}."
return string
@staticmethod
def from_str(value: str, *args) -> T:
parts = value.split(".")
if len(parts) != 4:
raise Exception("Invalid IP")
result = []
for part in parts:
byte = int(part)
if not 0 <= byte <= 255:
raise Exception("Invalid IP")
result.append(byte)
return result

View File

@@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
from typing import Generic
from cpl.core.typing import T
class PipeABC(ABC, Generic[T]):
@staticmethod
@abstractmethod
def to_str(value: T, *args) -> str:
pass
@staticmethod
@abstractmethod
def from_str(value: str, *args) -> T:
pass

View File

@@ -0,0 +1,2 @@
from .time_format_settings import TimeFormatSettings
from .time_format_settings_names_enum import TimeFormatSettingsNamesEnum

View File

@@ -1,10 +1,6 @@
import traceback
from typing import Optional
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console.console import Console
from cpl_core.console.foreground_color_enum import ForegroundColorEnum
from cpl_core.time.time_format_settings_names_enum import TimeFormatSettingsNamesEnum
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
class TimeFormatSettings(ConfigurationModelABC):

View File

@@ -0,0 +1,16 @@
from typing import TypeVar, Any
from uuid import UUID
T = TypeVar("T")
D = TypeVar("D")
R = TypeVar("R")
Service = TypeVar("Service")
Source = TypeVar("Source")
Messages = list[Any] | Any
UuidId = str | UUID
SerialId = int
Id = UuidId | SerialId

View File

@@ -0,0 +1,5 @@
from .base64 import Base64
from .credential_manager import CredentialManager
from .json_processor import JSONProcessor
from .pip import Pip
from .string import String

View File

@@ -0,0 +1,43 @@
import base64
from typing import Union
class Base64:
@staticmethod
def encode(string: str) -> str:
"""
Encode a string with base64
:param string:
:return:
"""
return base64.b64encode(string.encode("utf-8")).decode("utf-8")
@staticmethod
def decode(string: str) -> str:
"""
Decode a string with base64
:param string:
:return:
"""
return base64.b64decode(string).decode("utf-8")
@staticmethod
def is_b64(sb: Union[str, bytes]) -> bool:
"""
Check if a string is base64 encoded
:param Union[str, bytes] sb:
:return:
:rtype: bool
"""
try:
if isinstance(sb, str):
# If there's any unicode here, an exception will be thrown and the function will return false
sb_bytes = bytes(sb, "ascii")
elif isinstance(sb, bytes):
sb_bytes = sb
else:
raise ValueError("Argument must be string or bytes")
return base64.b64encode(base64.b64decode(sb_bytes)) == sb_bytes
except ValueError:
return False

View File

@@ -0,0 +1,56 @@
from typing import Type, Optional
from cpl.core.typing import T
def get_value(
source: dict,
key: str,
cast_type: Type[T],
default: Optional[T] = None,
list_delimiter: str = ",",
) -> Optional[T]:
"""
Get value from source dictionary and cast it to a specified type.
:param dict source: The source dictionary.
:param str key: The name of the environment variable.
:param Type[T] cast_type: A callable to cast the variable's value.
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.
:param str list_delimiter: The delimiter to split the value into a list. Defaults to ",".
:return: The casted value, or None if the key is not found.
:rtype: Optional[T]
"""
if key not in source:
return default
value = source[key]
if isinstance(
value,
cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__,
):
# Handle list[int] case explicitly
if hasattr(cast_type, "__origin__") and cast_type.__origin__ == list:
subtype = cast_type.__args__[0] if hasattr(cast_type, "__args__") else None
if subtype is not None:
return [subtype(item) for item in value]
return value
try:
if cast_type == bool:
return value.lower() in ["true", "1"]
if (cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__) == list:
if not (value.startswith("[") and value.endswith("]")) and list_delimiter not in value:
raise ValueError("List values must be enclosed in square brackets or use a delimiter.")
if value.startswith("[") and value.endswith("]"):
value = value[1:-1]
value = value.split(list_delimiter)
subtype = cast_type.__args__[0] if hasattr(cast_type, "__args__") else None
return [subtype(item) if subtype is not None else item for item in value]
return cast_type(value)
except (ValueError, TypeError):
return default

View File

@@ -1,7 +1,7 @@
import enum
from inspect import signature, Parameter
from cpl_core.utils import String
from cpl.core.utils.string import String
class JSONProcessor:
@@ -16,7 +16,7 @@ class JSONProcessor:
if parameter.name == "self" or parameter.annotation == Parameter.empty:
continue
name = String.first_to_upper(String.convert_to_camel_case(parameter.name))
name = String.first_to_upper(String.to_camel_case(parameter.name))
name_first_lower = String.first_to_lower(name)
if name in values or name_first_lower in values or name.upper() in values:
value = ""

View File

@@ -4,11 +4,10 @@ import sys
from contextlib import suppress
from typing import Optional
from cpl_core.console import Console
class Pip:
r"""Executes pip commands"""
_executable = sys.executable
_env = os.environ

View File

@@ -1,13 +1,13 @@
import random
import re
import string
import random
class String:
r"""Useful functions for strings"""
@staticmethod
def convert_to_camel_case(chars: str) -> str:
def to_camel_case(s: str) -> str:
r"""Converts string to camel case
Parameter:
@@ -17,16 +17,10 @@ class String:
Returns:
String converted to CamelCase
"""
converted_name = chars
char_set = string.punctuation + " "
for char in char_set:
if char in converted_name:
converted_name = "".join(word.title() for word in converted_name.split(char))
return converted_name
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
@staticmethod
def convert_to_snake_case(chars: str) -> str:
def to_snake_case(chars: str) -> str:
r"""Converts string to snake case
Parameter:
@@ -56,7 +50,7 @@ class String:
return re.sub(pattern2, r"\1_\2", file_name).lower()
@staticmethod
def first_to_upper(chars: str) -> str:
def first_to_upper(s: str) -> str:
r"""Converts first char to upper
Parameter:
@@ -66,10 +60,10 @@ class String:
Returns:
String with first char as upper
"""
return f"{chars[0].upper()}{chars[1:]}"
return s[0].upper() + s[1:] if s else s
@staticmethod
def first_to_lower(chars: str) -> str:
def first_to_lower(s: str) -> str:
r"""Converts first char to lower
Parameter:
@@ -79,14 +73,24 @@ class String:
Returns:
String with first char as lower
"""
return f"{chars[0].lower()}{chars[1:]}"
return s[0].lower() + s[1:] if s else s
@staticmethod
def random_string(chars: str, length: int) -> str:
def random(length: int, letters=True, digits=False, special_characters=False) -> str:
r"""Creates random string by given chars and length
Returns:
String of random chars
"""
return "".join(random.choice(chars) for _ in range(length))
characters = []
if letters:
characters.append(string.ascii_letters)
if digits:
characters.append(string.digits)
if special_characters:
characters.append(string.punctuation)
return "".join(random.choice(characters) for _ in range(length)) if characters else ""

View File

@@ -0,0 +1,29 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-core"
version = "2024.7.0"
description = "CPL core"
readme = "CPL core package"
requires-python = ">=3.12"
license = "MIT"
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "core", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,6 @@
art==6.5
colorama==0.4.6
tabulate==0.9.0
termcolor==3.1.0
mysql-connector-python==9.4.0
pynput==1.8.1

View File

@@ -0,0 +1,41 @@
from typing import Type
from cpl.dependency import ServiceCollection as _ServiceCollection
from . import mysql as _mysql
from . import postgres as _postgres
from .internal_tables import InternalTables
def _add(collection: _ServiceCollection,db_context: Type, default_port: int, server_type: str):
from cpl.core.console import Console
from cpl.core.configuration import Configuration
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.model.server_type import ServerTypes, ServerType
from cpl.database.model.database_settings import DatabaseSettings
from cpl.database.service.migration_service import MigrationService
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
try:
ServerType.set_server_type(ServerTypes(server_type))
Configuration.set("DB_DEFAULT_PORT", default_port)
collection.add_singleton(DBContextABC, db_context)
collection.add_singleton(ExecutedMigrationDao)
collection.add_singleton(MigrationService)
except ImportError as e:
Console.error("cpl-database is not installed", str(e))
def add_mysql(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 3306, ServerTypes.MYSQL.value)
def add_postgres(collection: _ServiceCollection):
from cpl.database.mysql.db_context import DBContext
from cpl.database.model import ServerTypes
_add(collection, DBContext, 5432, ServerTypes.POSTGRES.value)
_ServiceCollection.with_module(add_mysql, _mysql.__name__)
_ServiceCollection.with_module(add_postgres, _postgres.__name__)

View File

@@ -0,0 +1,68 @@
import textwrap
from typing import Callable
class ExternalDataTempTableBuilder:
def __init__(self):
self._table_name = None
self._fields: dict[str, str] = {}
self._primary_key = "id"
self._join_ref_table = None
self._value_getter = None
@property
def table_name(self) -> str:
return self._table_name
@property
def fields(self) -> dict[str, str]:
return self._fields
@property
def primary_key(self) -> str:
return self._primary_key
@property
def join_ref_table(self) -> str:
return self._join_ref_table
def with_table_name(self, table_name: str) -> "ExternalDataTempTableBuilder":
self._join_ref_table = table_name
if "." in table_name:
table_name = table_name.split(".")[-1]
if not table_name.endswith("_temp"):
table_name = f"{table_name}_temp"
self._table_name = table_name
return self
def with_field(self, name: str, sql_type: str, primary=False) -> "ExternalDataTempTableBuilder":
if primary:
sql_type += " PRIMARY KEY"
self._primary_key = name
self._fields[name] = sql_type
return self
def with_value_getter(self, value_getter: Callable) -> "ExternalDataTempTableBuilder":
self._value_getter = value_getter
return self
async def build(self) -> str:
assert self._table_name is not None, "Table name is required"
assert self._value_getter is not None, "Value getter is required"
values_str = ", ".join([f"{value}" for value in await self._value_getter()])
return textwrap.dedent(
f"""
DROP TABLE IF EXISTS {self._table_name};
CREATE TEMP TABLE {self._table_name} (
{", ".join([f"{k} {v}" for k, v in self._fields.items()])}
);
INSERT INTO {self._table_name} VALUES {values_str};
"""
)

View File

@@ -0,0 +1,5 @@
from .connection_abc import ConnectionABC
from .db_context_abc import DBContextABC
from .db_join_model_abc import DbJoinModelABC
from .db_model_abc import DbModelABC
from .db_model_dao_abc import DbModelDaoABC

View File

@@ -1,12 +1,12 @@
from abc import ABC, abstractmethod
from cpl_core.database.database_settings import DatabaseSettings
from cpl.database.model.database_settings import DatabaseSettings
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseConnectionABC(ABC):
r"""ABC for the :class:`cpl_core.database.connection.database_connection.DatabaseConnection`"""
class ConnectionABC(ABC):
r"""ABC for the :class:`cpl.database.connection.database_connection.DatabaseConnection`"""
@abstractmethod
def __init__(self):
@@ -30,4 +30,3 @@ class DatabaseConnectionABC(ABC):
connection_string: :class:`str`
Database connection string, see: https://docs.sqlalchemy.org/en/14/core/engines.html
"""
pass

View File

@@ -0,0 +1,875 @@
import datetime
from abc import ABC, abstractmethod
from enum import Enum
from types import NoneType
from typing import Generic, Optional, Union, Type, List, Any
from cpl.core.typing import T, Id
from cpl.core.utils import String
from cpl.core.utils.get_value import get_value
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.const import DATETIME_FORMAT
from cpl.database.db_logger import DBLogger
from cpl.database.postgres.sql_select_builder import SQLSelectBuilder
from cpl.database.typing import T_DBM, Attribute, AttributeFilters, AttributeSorts
class DataAccessObjectABC(ABC, Generic[T_DBM]):
@abstractmethod
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
from cpl.dependency.service_provider_abc import ServiceProviderABC
self._db = ServiceProviderABC.get_global_provider().get_service(DBContextABC)
self._logger = DBLogger(source)
self._model_type = model_type
self._table_name = table_name
self._logger = DBLogger(source)
self._model_type = model_type
self._table_name = table_name
self._default_filter_condition = None
self.__attributes: dict[str, type] = {}
self.__db_names: dict[str, str] = {}
self.__foreign_tables: dict[str, tuple[str, str]] = {}
self.__foreign_table_keys: dict[str, str] = {}
self.__foreign_dao: dict[str, "DataAccessObjectABC"] = {}
self.__date_attributes: set[str] = set()
self.__ignored_attributes: set[str] = set()
self.__primary_key = "id"
self.__primary_key_type = int
self._external_fields: dict[str, ExternalDataTempTableBuilder] = {}
@property
def table_name(self) -> str:
return self._table_name
def has_attribute(self, attr_name: Attribute) -> bool:
"""
Check if the attribute exists in the DAO
:param Attribute attr_name: Name of the attribute
:return: True if the attribute exists, False otherwise
"""
return attr_name in self.__attributes
def attribute(
self,
attr_name: Attribute,
attr_type: type,
db_name: str = None,
ignore=False,
primary_key=False,
aliases: list[str] = None,
):
"""
Add an attribute for db and object mapping to the data access object
:param Attribute attr_name: Name of the attribute in the object
:param type attr_type: Python type of the attribute to cast db value to
:param str db_name: Name of the field in the database, if None the attribute lowered attr_name without "_" is used
:param bool ignore: Defines if field is ignored for create and update (for e.g. auto increment fields or created/updated fields)
:param bool primary_key: Defines if field is the primary key
:param list[str] aliases: List of aliases for the attribute name
:return:
"""
if isinstance(attr_name, property):
attr_name = attr_name.fget.__name__
self.__attributes[attr_name] = attr_type
if ignore:
self.__ignored_attributes.add(attr_name)
if not db_name:
db_name = attr_name.lower().replace("_", "")
self.__db_names[attr_name] = db_name
self.__db_names[db_name] = db_name
if aliases is not None:
for alias in aliases:
if alias in self.__db_names:
raise ValueError(f"Alias {alias} already exists")
self.__db_names[alias] = db_name
if primary_key:
self.__primary_key = db_name
self.__primary_key_type = attr_type
if attr_type in [datetime, datetime.datetime]:
self.__date_attributes.add(attr_name)
self.__date_attributes.add(db_name)
def reference(
self,
attr: Attribute,
primary_attr: Attribute,
foreign_attr: Attribute,
table_name: str,
reference_dao: "DataAccessObjectABC" = None,
):
"""
Add a reference to another table for the given attribute
:param Attribute attr: Name of the attribute in the object
:param str primary_attr: Name of the primary key in the foreign object
:param str foreign_attr: Name of the foreign key in the object
:param str table_name: Name of the table to reference
:param DataAccessObjectABC reference_dao: The data access object for the referenced table
:return:
"""
if isinstance(attr, property):
attr = attr.fget.__name__
if isinstance(primary_attr, property):
primary_attr = primary_attr.fget.__name__
primary_attr = primary_attr.lower().replace("_", "")
if isinstance(foreign_attr, property):
foreign_attr = foreign_attr.fget.__name__
foreign_attr = foreign_attr.lower().replace("_", "")
self.__foreign_table_keys[attr] = foreign_attr
if reference_dao is not None:
self.__foreign_dao[attr] = reference_dao
if table_name == self._table_name:
return
self.__foreign_tables[attr] = (
table_name,
f"{table_name}.{primary_attr} = {self._table_name}.{foreign_attr}",
)
def use_external_fields(self, builder: ExternalDataTempTableBuilder):
self._external_fields[builder.table_name] = builder
def to_object(self, result: dict) -> T_DBM:
"""
Convert a result from the database to an object
:param dict result: Result from the database
:return:
"""
value_map: dict[str, T] = {}
for db_name, value in result.items():
# Find the attribute name corresponding to the db_name
attr_name = next((k for k, v in self.__db_names.items() if v == db_name), None)
if attr_name:
value_map[attr_name] = self._get_value_from_sql(self.__attributes[attr_name], value)
return self._model_type(**value_map)
def to_dict(self, obj: T_DBM) -> dict:
"""
Convert an object to a dictionary
:param T_DBM obj: Object to convert
:return:
"""
value_map: dict[str, Any] = {}
for attr_name, attr_type in self.__attributes.items():
value = getattr(obj, attr_name)
if isinstance(value, datetime.datetime):
value = value.strftime(DATETIME_FORMAT)
elif isinstance(value, Enum):
value = value.value
value_map[attr_name] = value
for ex_fname in self._external_fields:
ex_field = self._external_fields[ex_fname]
for ex_attr in ex_field.fields:
if ex_attr == self.__primary_key:
continue
value_map[ex_attr] = getattr(obj, ex_attr, None)
return value_map
async def count(self, filters: AttributeFilters = None) -> int:
result = await self._prepare_query(filters=filters, for_count=True)
return result[0]["count"] if result else 0
async def get_history(
self,
entry_id: int,
by_key: str = None,
when: datetime = None,
until: datetime = None,
without_deleted: bool = False,
) -> list[T_DBM]:
"""
Retrieve the history of an entry from the history table.
:param entry_id: The ID of the entry to retrieve history for.
:param by_key: The key to filter by (default is the primary key).
:param when: A specific timestamp to filter the history.
:param until: A timestamp to filter history entries up to a certain point.
:param without_deleted: Exclude deleted entries if True.
:return: A list of historical entries as objects.
"""
f_tables = list(self.__foreign_tables.keys())
history_table = f"{self._table_name}_history"
builder = SQLSelectBuilder(history_table, self.__primary_key)
builder.with_attribute("*")
builder.with_value_condition(
f"{history_table}.{by_key or self.__primary_key}",
"=",
str(entry_id),
f_tables,
)
if self._default_filter_condition:
builder.with_condition(self._default_filter_condition, "", f_tables)
if without_deleted:
builder.with_value_condition(f"{history_table}.deleted", "=", "false", f_tables)
if when:
builder.with_value_condition(
self._attr_from_date_to_char(f"{history_table}.updated"),
"=",
f"'{when.strftime(DATETIME_FORMAT)}'",
f_tables,
)
if until:
builder.with_value_condition(
self._attr_from_date_to_char(f"{history_table}.updated"),
"<=",
f"'{until.strftime(DATETIME_FORMAT)}'",
f_tables,
)
builder.with_order_by(f"{history_table}.updated", "DESC")
query = await builder.build()
result = await self._db.select_map(query)
return [self.to_object(x) for x in result] if result else []
async def get_all(self) -> List[T_DBM]:
result = await self._prepare_query(sorts=[{self.__primary_key: "asc"}])
return [self.to_object(x) for x in result] if result else []
async def get_by_id(self, id: Union[int, str]) -> Optional[T_DBM]:
result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}])
return self.to_object(result[0]) if result else None
async def find_by_id(self, id: Union[int, str]) -> Optional[T_DBM]:
result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}])
return self.to_object(result[0]) if result else None
async def get_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> list[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
if not result or len(result) == 0:
raise ValueError("No result found")
return [self.to_object(x) for x in result] if result else []
async def get_single_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> T_DBM:
result = await self._prepare_query(filters, sorts, take, skip)
if not result:
raise ValueError("No result found")
if len(result) > 1:
raise ValueError("More than one result found")
return self.to_object(result[0])
async def find_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> list[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
return [self.to_object(x) for x in result] if result else []
async def find_single_by(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
) -> Optional[T_DBM]:
result = await self._prepare_query(filters, sorts, take, skip)
if len(result) > 1:
raise ValueError("More than one result found")
return self.to_object(result[0]) if result else None
async def touch(self, obj: T_DBM):
"""
Touch the entry to update the last updated date
:return:
"""
await self._db.execute(
f"""
UPDATE {self._table_name}
SET updated = NOW()
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
)
async def touch_many_by_id(self, ids: list[Id]):
"""
Touch the entries to update the last updated date
:return:
"""
if len(ids) == 0:
return
await self._db.execute(
f"""
UPDATE {self._table_name}
SET updated = NOW()
WHERE {self.__primary_key} IN ({", ".join([str(x) for x in ids])});
"""
)
async def _build_create_statement(self, obj: T_DBM, skip_editor=False) -> str:
allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes]
fields = ", ".join([self.__db_names[x] for x in allowed_fields])
fields = f"{'EditorId' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}"
values = ", ".join([self._get_value_sql(getattr(obj, x)) for x in allowed_fields])
values = f"{await self._get_editor_id(obj) if not skip_editor else ''}{f', {values}' if not skip_editor and len(values) > 0 else f'{values}'}"
return f"""
INSERT INTO {self._table_name} (
{fields}
) VALUES (
{values}
)
RETURNING {self.__primary_key};
"""
async def create(self, obj: T_DBM, skip_editor=False) -> int:
self._logger.debug(f"create {type(obj).__name__} {obj.__dict__}")
result = await self._db.execute(await self._build_create_statement(obj, skip_editor))
return self._get_value_from_sql(self.__primary_key_type, result[0][0])
async def create_many(self, objs: list[T_DBM], skip_editor=False) -> list[int]:
if len(objs) == 0:
return []
self._logger.debug(f"create many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_create_statement(obj, skip_editor)
result = await self._db.execute(query)
return [self._get_value_from_sql(self.__primary_key_type, x[0]) for x in result]
async def _build_update_statement(self, obj: T_DBM, skip_editor=False) -> str:
allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes]
fields = ", ".join(
[f"{self.__db_names[x]} = {self._get_value_sql(getattr(obj, x, None))}" for x in allowed_fields]
)
fields = f"{f'EditorId = {await self._get_editor_id(obj)}' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}"
return f"""
UPDATE {self._table_name}
SET {fields}
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def update(self, obj: T_DBM, skip_editor=False):
self._logger.debug(f"update {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_update_statement(obj, skip_editor))
async def update_many(self, objs: list[T_DBM], skip_editor=False):
if len(objs) == 0:
return
self._logger.debug(f"update many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_update_statement(obj, skip_editor)
await self._db.execute(query)
async def _build_delete_statement(self, obj: T_DBM, hard_delete: bool = False) -> str:
if hard_delete:
return f"""
DELETE FROM {self._table_name}
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
return f"""
UPDATE {self._table_name}
SET EditorId = {await self._get_editor_id(obj)},
Deleted = true
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def delete(self, obj: T_DBM, hard_delete: bool = False):
self._logger.debug(f"delete {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_delete_statement(obj, hard_delete))
async def delete_many(self, objs: list[T_DBM], hard_delete: bool = False):
if len(objs) == 0:
return
self._logger.debug(f"delete many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}")
query = ""
for obj in objs:
query += await self._build_delete_statement(obj, hard_delete)
await self._db.execute(query)
async def _build_restore_statement(self, obj: T_DBM) -> str:
return f"""
UPDATE {self._table_name}
SET EditorId = {await self._get_editor_id(obj)},
Deleted = false
WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)};
"""
async def restore(self, obj: T_DBM):
self._logger.debug(f"restore {type(obj).__name__} {obj.__dict__}")
await self._db.execute(await self._build_restore_statement(obj))
async def restore_many(self, objs: list[T_DBM]):
if len(objs) == 0:
return
self._logger.debug(f"restore many {type(objs[0]).__name__} {len(objs)} {objs[0].__dict__}")
query = ""
for obj in objs:
query += await self._build_restore_statement(obj)
await self._db.execute(query)
async def _prepare_query(
self,
filters: AttributeFilters = None,
sorts: AttributeSorts = None,
take: int = None,
skip: int = None,
for_count=False,
) -> list[dict]:
"""
Prepares and executes a query using the SQLBuilder with the given parameters.
:param filters: Conditions to filter the query.
:param sorts: Sorting attributes and directions.
:param take: Limit the number of results.
:param skip: Offset the results.
:return: Query result as a list of dictionaries.
"""
external_table_deps = []
builder = SQLSelectBuilder(self._table_name, self.__primary_key)
for temp in self._external_fields:
builder.with_temp_table(self._external_fields[temp])
if for_count:
builder.with_attribute("COUNT(*)", ignore_table_name=True)
else:
builder.with_attribute("*")
for attr in self.__foreign_tables:
table, join_condition = self.__foreign_tables[attr]
builder.with_left_join(table, join_condition)
if filters:
await self._build_conditions(builder, filters, external_table_deps)
if sorts:
self._build_sorts(builder, sorts, external_table_deps)
if take:
builder.with_limit(take)
if skip:
builder.with_offset(skip)
for external_table in external_table_deps:
builder.use_temp_table(external_table)
query = await builder.build()
return await self._db.select_map(query)
async def _build_conditions(
self,
builder: SQLSelectBuilder,
filters: AttributeFilters,
external_table_deps: list[str],
):
"""
Builds SQL conditions from GraphQL-like filters and adds them to the SQLBuilder.
:param builder: The SQLBuilder instance to add conditions to.
:param filters: GraphQL-like filter structure.
:param external_table_deps: List to store external table dependencies.
"""
if not isinstance(filters, list):
filters = [filters]
for filter_group in filters:
sql_conditions = self._graphql_to_sql_conditions(filter_group, external_table_deps)
for attr, operator, value in sql_conditions:
if attr in self.__foreign_table_keys:
attr = self.__foreign_table_keys[attr]
recursive_join = self._get_recursive_reference_join(attr)
if recursive_join is not None:
builder.with_left_join(*recursive_join)
external_table = self._get_external_field_key(attr)
if external_table is not None:
external_table_deps.append(external_table)
if operator == "fuzzy":
builder.with_levenshtein_condition(attr)
elif operator in [
"IS NULL",
"IS NOT NULL",
]: # operator without value
builder.with_condition(
attr,
operator,
[
x[0]
for fdao in self.__foreign_dao
for x in self.__foreign_dao[fdao].__foreign_tables.values()
],
)
else:
if attr in self.__date_attributes or String.to_snake_case(attr) in self.__date_attributes:
attr = self._attr_from_date_to_char(f"{self._table_name}.{attr}")
builder.with_value_condition(
attr,
operator,
self._get_value_sql(value),
[
x[0]
for fdao in self.__foreign_dao
for x in self.__foreign_dao[fdao].__foreign_tables.values()
],
)
def _graphql_to_sql_conditions(
self, graphql_structure: dict, external_table_deps: list[str]
) -> list[tuple[str, str, Any]]:
"""
Converts a GraphQL-like structure to SQL conditions.
:param graphql_structure: The GraphQL-like filter structure.
:param external_table_deps: List to track external table dependencies.
:return: A list of tuples (attribute, operator, value).
"""
operators = {
"equal": "=",
"notEqual": "!=",
"greater": ">",
"greaterOrEqual": ">=",
"less": "<",
"lessOrEqual": "<=",
"isNull": "IS NULL",
"isNotNull": "IS NOT NULL",
"contains": "LIKE", # Special handling in _graphql_to_sql_conditions
"notContains": "NOT LIKE", # Special handling in _graphql_to_sql_conditions
"startsWith": "LIKE", # Special handling in _graphql_to_sql_conditions
"endsWith": "LIKE", # Special handling in _graphql_to_sql_conditions
"in": "IN",
"notIn": "NOT IN",
}
conditions = []
def parse_node(node, parent_key=None, parent_dao=None):
if not isinstance(node, dict):
return
if isinstance(node, list):
conditions.append((parent_key, "IN", node))
return
for key, value in node.items():
if isinstance(key, property):
key = key.fget.__name__
external_fields_table_name_by_parent = self._get_external_field_key(parent_key)
external_fields_table_name = self._get_external_field_key(key)
external_field = (
external_fields_table_name
if external_fields_table_name_by_parent is None
else external_fields_table_name_by_parent
)
if key == "fuzzy":
self._handle_fuzzy_filter_conditions(conditions, external_table_deps, value)
elif parent_dao is not None and key in parent_dao.__db_names:
parse_node(value, f"{parent_dao.table_name}.{key}")
continue
elif external_field is not None:
external_table_deps.append(external_field)
parse_node(value, f"{external_field}.{key}")
elif parent_key in self.__foreign_table_keys:
if key in operators:
parse_node({key: value}, self.__foreign_table_keys[parent_key])
continue
if parent_key in self.__foreign_dao:
foreign_dao = self.__foreign_dao[parent_key]
if key in foreign_dao.__foreign_tables:
parse_node(
value,
f"{self.__foreign_tables[parent_key][0]}.{foreign_dao.__foreign_table_keys[key]}",
foreign_dao.__foreign_dao[key],
)
continue
if parent_key in self.__foreign_tables:
parse_node(value, f"{self.__foreign_tables[parent_key][0]}.{key}")
continue
parse_node({parent_key: value})
elif key in operators:
operator = operators[key]
if key == "contains" or key == "notContains":
value = f"%{value}%"
elif key == "in" or key == "notIn":
value = value
elif key == "startsWith":
value = f"{value}%"
elif key == "endsWith":
value = f"%{value}"
elif key == "isNull" or key == "isNotNull":
is_null_value = value.get("equal", None) if isinstance(value, dict) else value
if is_null_value is None:
operator = operators[key]
elif (key == "isNull" and is_null_value) or (key == "isNotNull" and not is_null_value):
operator = "IS NULL"
else:
operator = "IS NOT NULL"
conditions.append((parent_key, operator, None))
elif (key == "equal" or key == "notEqual") and value is None:
operator = operators["isNull"]
conditions.append((parent_key, operator, value))
elif isinstance(value, dict):
if key in self.__foreign_table_keys:
parse_node(value, key)
elif key in self.__db_names and parent_key is not None:
parse_node({f"{parent_key}": value})
elif key in self.__db_names:
parse_node(value, self.__db_names[key])
else:
parse_node(value, key)
elif value is None:
conditions.append((self.__db_names[key], "IS NULL", value))
else:
conditions.append((self.__db_names[key], "=", value))
parse_node(graphql_structure)
return conditions
def _handle_fuzzy_filter_conditions(self, conditions, external_field_table_deps, sub_values):
# Extract fuzzy filter parameters
fuzzy_fields = get_value(sub_values, "fields", list[str])
fuzzy_term = get_value(sub_values, "term", str)
fuzzy_threshold = get_value(sub_values, "threshold", int, 5)
if not fuzzy_fields or not fuzzy_term:
raise ValueError("Fuzzy filter must include 'fields' and 'term'.")
fuzzy_fields_db_names = []
# Map fields to their database names
for fuzzy_field in fuzzy_fields:
external_fields_table_name = self._get_external_field_key(fuzzy_field)
if external_fields_table_name is not None:
external_fields_table = self._external_fields[external_fields_table_name]
fuzzy_fields_db_names.append(f"{external_fields_table.table_name}.{fuzzy_field}")
external_field_table_deps.append(external_fields_table.table_name)
elif fuzzy_field in self.__db_names:
fuzzy_fields_db_names.append(f"{self._table_name}.{self.__db_names[fuzzy_field]}")
elif fuzzy_field in self.__foreign_tables:
fuzzy_fields_db_names.append(f"{self._table_name}.{self.__foreign_table_keys[fuzzy_field]}")
else:
fuzzy_fields_db_names.append(self.__db_names[String.to_snake_case(fuzzy_field)][0])
# Build fuzzy conditions for each field
fuzzy_conditions = self._build_fuzzy_conditions(fuzzy_fields_db_names, fuzzy_term, fuzzy_threshold)
# Combine conditions with OR and append to the main conditions
conditions.append((f"({' OR '.join(fuzzy_conditions)})", "fuzzy", None))
@staticmethod
def _build_fuzzy_conditions(fields: list[str], term: str, threshold: int = 10) -> list[str]:
conditions = []
for field in fields:
conditions.append(f"levenshtein({field}::TEXT, '{term}') <= {threshold}") # Adjust the threshold as needed
return conditions
def _get_external_field_key(self, field_name: str) -> Optional[str]:
"""
Returns the key to get the external field if found, otherwise None.
:param str field_name: The name of the field to search for.
:return: The key if found, otherwise None.
:rtype: Optional[str]
"""
if field_name is None:
return None
for key, builder in self._external_fields.items():
if field_name in builder.fields and field_name not in self.__db_names:
return key
return None
def _get_recursive_reference_join(self, attr: str) -> Optional[tuple[str, str]]:
parts = attr.split(".")
table_name = ".".join(parts[:-1])
if table_name == self._table_name or table_name == "":
return None
all_foreign_tables = {
x[0]: x[1]
for x in [
*[x for x in self.__foreign_tables.values() if x[0] != self._table_name],
*[x for fdao in self.__foreign_dao for x in self.__foreign_dao[fdao].__foreign_tables.values()],
]
}
if not table_name in all_foreign_tables:
return None
return table_name, all_foreign_tables[table_name]
def _build_sorts(
self,
builder: SQLSelectBuilder,
sorts: AttributeSorts,
external_table_deps: list[str],
):
"""
Resolves complex sorting structures into SQL-compatible sorting conditions.
Tracks external table dependencies.
:param builder: The SQLBuilder instance to add sorting to.
:param sorts: Sorting attributes and directions in a complex structure.
:param external_table_deps: List to track external table dependencies.
"""
def parse_sort_node(node, parent_key=None):
if isinstance(node, dict):
for key, value in node.items():
if isinstance(value, dict):
# Recursively parse nested structures
parse_sort_node(value, key)
elif isinstance(value, str) and value.lower() in ["asc", "desc"]:
external_table = self._get_external_field_key(key)
if external_table:
external_table_deps.append(external_table)
key = f"{external_table}.{key}"
if parent_key in self.__foreign_tables:
key = f"{self.__foreign_tables[parent_key][0]}.{key}"
builder.with_order_by(key, value.upper())
else:
raise ValueError(f"Invalid sort direction: {value}")
elif isinstance(node, list):
for item in node:
parse_sort_node(item)
else:
raise ValueError(f"Invalid sort structure: {node}")
parse_sort_node(sorts)
def _get_value_sql(self, value: Any) -> str:
if isinstance(value, str):
if value.lower() == "null":
return "NULL"
return f"'{value}'"
if isinstance(value, NoneType):
return "NULL"
if value is None:
return "NULL"
if isinstance(value, Enum):
return f"'{value.value}'"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, list):
if len(value) == 0:
return "()"
return f"({', '.join([self._get_value_sql(x) for x in value])})"
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
return f"'{value.strftime(DATETIME_FORMAT)}'"
return str(value)
@staticmethod
def _get_value_from_sql(cast_type: type, value: Any) -> Optional[T]:
"""
Get the value from the query result and cast it to the correct type
:param type cast_type:
:param Any value:
:return Optional[T]: Casted value, when value is str "NULL" None is returned
"""
if isinstance(value, str) and "NULL" in value:
return None
if isinstance(value, NoneType):
return None
if isinstance(value, cast_type):
return value
return cast_type(value)
def _get_primary_key_value_sql(self, obj: T_DBM) -> str:
value = getattr(obj, self.__primary_key)
if isinstance(value, str):
return f"'{value}'"
return value
@staticmethod
def _attr_from_date_to_char(attr: str) -> str:
return f"TO_CHAR({attr}, 'YYYY-MM-DD HH24:MI:SS.US TZ')"
@staticmethod
async def _get_editor_id(obj: T_DBM):
editor_id = obj.editor_id
# if editor_id is None:
# user = get_user()
# if user is not None:
# editor_id = user.id
return editor_id if editor_id is not None else "NULL"

View File

@@ -0,0 +1,53 @@
from abc import ABC, abstractmethod
from typing import Any
from cpl.database.model.database_settings import DatabaseSettings
class DBContextABC(ABC):
r"""ABC for the :class:`cpl.database.context.database_context.DatabaseContext`"""
@abstractmethod
def connect(self, database_settings: DatabaseSettings):
r"""Connects to a database by connection settings
Parameter:
database_settings :class:`cpl.database.database_settings.DatabaseSettings`
"""
@abstractmethod
async def execute(self, statement: str, args=None, multi=True) -> list[list]:
r"""Runs SQL Statements
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
multi: :class:`bool`
Returns:
list: Fetched list of executed elements
"""
@abstractmethod
async def select_map(self, statement: str, args=None) -> list[dict]:
r"""Runs SQL Select Statements and returns a list of dictionaries
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
Returns:
list: Fetched list of executed elements as dictionary
"""
@abstractmethod
async def select(self, statement: str, args=None) -> list[str] | list[tuple] | list[Any]:
r"""Runs SQL Select Statements and returns a list of dictionaries
Parameter:
statement: :class:`str`
args: :class:`list` | :class:`tuple` | :class:`dict` | :class:`None`
Returns:
list: Fetched list of executed elements
"""

View File

@@ -0,0 +1,30 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import Id, SerialId
from cpl.database.abc.db_model_abc import DbModelABC
class DbJoinModelABC[T](DbModelABC[T]):
def __init__(
self,
id: Id,
source_id: Id,
foreign_id: Id,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._source_id = source_id
self._foreign_id = foreign_id
@property
def source_id(self) -> Id:
return self._source_id
@property
def foreign_id(self) -> Id:
return self._foreign_id

View File

@@ -0,0 +1,79 @@
from abc import ABC
from datetime import datetime, timezone
from typing import Optional, Generic
from cpl.core.typing import Id, SerialId, T
class DbModelABC(ABC, Generic[T]):
def __init__(
self,
id: Id,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
self._id = id
self._deleted = deleted
self._editor_id = editor_id
self._created = created if created is not None else datetime.now(timezone.utc).isoformat()
self._updated = updated if updated is not None else datetime.now(timezone.utc).isoformat()
@property
def id(self) -> Id:
return self._id
@property
def deleted(self) -> bool:
return self._deleted
@deleted.setter
def deleted(self, value: bool):
self._deleted = value
@property
def editor_id(self) -> SerialId:
return self._editor_id
@editor_id.setter
def editor_id(self, value: SerialId):
self._editor_id = value
# @async_property
# async def editor(self):
# if self._editor_id is None:
# return None
#
# from data.schemas.administration.user_dao import userDao
#
# return await userDao.get_by_id(self._editor_id)
@property
def created(self) -> datetime:
return self._created
@property
def updated(self) -> datetime:
return self._updated
@updated.setter
def updated(self, value: datetime):
self._updated = value
def to_dict(self) -> dict:
result = {}
for name, value in self.__dict__.items():
if not name.startswith("_") or name.endswith("_"):
continue
if isinstance(value, datetime):
value = value.isoformat()
if not isinstance(value, str):
value = str(value)
result[name.replace("_", "")] = value
return result

View File

@@ -0,0 +1,25 @@
from abc import abstractmethod
from datetime import datetime
from typing import Type
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.abc.db_model_abc import DbModelABC
from cpl.database.internal_tables import InternalTables
class DbModelDaoABC[T_DBM](DataAccessObjectABC[T_DBM]):
@abstractmethod
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
DataAccessObjectABC.__init__(self, source, model_type, table_name)
self.attribute(DbModelABC.id, int, ignore=True)
self.attribute(DbModelABC.deleted, bool)
self.attribute(DbModelABC.editor_id, int, ignore=True) # handled by db trigger
self.reference(
"editor", "id", DbModelABC.editor_id, InternalTables.users
) # not relevant for updates due to editor_id
self.attribute(DbModelABC.created, datetime, ignore=True) # handled by db trigger
self.attribute(DbModelABC.updated, datetime, ignore=True) # handled by db trigger

View File

@@ -0,0 +1 @@
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f %z"

View File

@@ -0,0 +1,8 @@
from cpl.core.log import Logger
from cpl.core.typing import Source
class DBLogger(Logger):
def __init__(self, source: Source):
Logger.__init__(self, source, "db")

View File

@@ -0,0 +1,15 @@
from cpl.database.model.server_type import ServerTypes, ServerType
class InternalTables:
@classmethod
@property
def users(cls) -> str:
return "administration.users" if ServerType.server_type is ServerTypes.POSTGRES else "users"
@classmethod
@property
def executed_migrations(cls) -> str:
return "system._executed_migrations" if ServerType.server_type is ServerTypes.POSTGRES else "_executed_migrations"

View File

@@ -0,0 +1,3 @@
from .database_settings import DatabaseSettings
from .migration import Migration
from .server_type import ServerTypes

View File

@@ -1,6 +1,9 @@
from typing import Optional
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.configuration import Configuration
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.environment import Environment
from cpl.core.utils import Base64
class DatabaseSettings(ConfigurationModelABC):
@@ -8,23 +11,23 @@ class DatabaseSettings(ConfigurationModelABC):
def __init__(
self,
host: str = None,
port: int = 3306,
user: str = None,
password: str = None,
database: str = None,
charset: str = "utf8mb4",
use_unicode: bool = False,
buffered: bool = False,
auth_plugin: str = "caching_sha2_password",
ssl_disabled: bool = False,
host: str = Environment.get("DB_HOST", str),
port: int = Environment.get("DB_PORT", str, Configuration.get("DB_DEFAULT_PORT", 0)),
user: str = Environment.get("DB_USER", str),
password: str = Environment.get("DB_PASSWORD", str),
database: str = Environment.get("DB_DATABASE", str),
charset: str = Environment.get("DB_CHARSET", str, "utf8mb4"),
use_unicode: bool = Environment.get("DB_USE_UNICODE", bool, False),
buffered: bool = Environment.get("DB_BUFFERED", bool, False),
auth_plugin: str = Environment.get("DB_AUTH_PLUGIN", str, "caching_sha2_password"),
ssl_disabled: bool = Environment.get("DB_SSL_DISABLED", bool, False),
):
ConfigurationModelABC.__init__(self)
self._host: Optional[str] = host
self._port: Optional[int] = port
self._user: Optional[str] = user
self._password: Optional[str] = password
self._password: Optional[str] = Base64.decode(password) if Base64.is_b64(password) else password
self._database: Optional[str] = database
self._charset: Optional[str] = charset
self._use_unicode: Optional[bool] = use_unicode

View File

@@ -0,0 +1,12 @@
class Migration:
def __init__(self, name: str, script: str):
self._name = name
self._script = script
@property
def name(self) -> str:
return self._name
@property
def script(self) -> str:
return self._script

View File

@@ -0,0 +1,21 @@
from enum import Enum
class ServerTypes(Enum):
POSTGRES = "postgres"
MYSQL = "mysql"
class ServerType:
_server_type: ServerTypes = None
@classmethod
def set_server_type(cls, server_type: ServerTypes):
assert server_type is not None, "server_type must not be None"
assert isinstance(server_type, ServerTypes), f"Expected ServerType but got {type(server_type)}"
cls._server_type = server_type
@classmethod
@property
def server_type(cls) -> ServerTypes:
assert cls._server_type is not None, "Server type is not set"
return cls._server_type

View File

@@ -4,16 +4,16 @@ import mysql.connector as sql
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.cursor import MySQLCursorBuffered
from cpl_core.database.connection.database_connection_abc import DatabaseConnectionABC
from cpl_core.database.database_settings import DatabaseSettings
from cpl_core.utils.credential_manager import CredentialManager
from cpl.database.abc.connection_abc import ConnectionABC
from cpl.database.database_settings import DatabaseSettings
from cpl.core.utils.credential_manager import CredentialManager
class DatabaseConnection(DatabaseConnectionABC):
class DatabaseConnection(ConnectionABC):
r"""Representation of the database connection"""
def __init__(self):
DatabaseConnectionABC.__init__(self)
ConnectionABC.__init__(self)
self._database: Optional[MySQLConnectionAbstract] = None
self._cursor: Optional[MySQLCursorBuffered] = None

View File

@@ -0,0 +1,84 @@
import uuid
from typing import Any, List, Dict, Tuple, Union
from mysql.connector import Error as MySQLError, PoolError
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.db_logger import DBLogger
from cpl.database.model.database_settings import DatabaseSettings
from cpl.database.mysql.mysql_pool import MySQLPool
_logger = DBLogger(__name__)
class DBContext(DBContextABC):
def __init__(self):
DBContextABC.__init__(self)
self._pool: MySQLPool = None
self._fails = 0
self.connect(Configuration.get(DatabaseSettings))
def connect(self, database_settings: DatabaseSettings):
try:
_logger.debug("Connecting to database")
self._pool = MySQLPool(
database_settings,
)
_logger.info("Connected to database")
except Exception as e:
_logger.fatal("Connecting to database failed", e)
async def execute(self, statement: str, args=None, multi=True) -> List[List]:
_logger.trace(f"execute {statement} with args: {args}")
return await self._pool.execute(statement, args, multi)
async def select_map(self, statement: str, args=None) -> List[Dict]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select_map(statement, args)
except (MySQLError, PoolError) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select_map(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e
async def select(self, statement: str, args=None) -> Union[List[str], List[Tuple], List[Any]]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select(statement, args)
except (MySQLError, PoolError) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e

View File

@@ -0,0 +1,105 @@
from typing import Optional, Any
import sqlparse
import aiomysql
from cpl.core.environment import Environment
from cpl.database.db_logger import DBLogger
from cpl.database.model import DatabaseSettings
_logger = DBLogger(__name__)
class MySQLPool:
"""
Create a pool when connecting to MySQL, which will decrease the time spent in
requesting connection, creating connection, and closing connection.
"""
def __init__(self, database_settings: DatabaseSettings):
self._db_settings = database_settings
self.pool: Optional[aiomysql.Pool] = None
async def _get_pool(self):
if self.pool is None or self.pool._closed:
try:
self.pool = await aiomysql.create_pool(
host=self._db_settings.host,
port=self._db_settings.port,
user=self._db_settings.user,
password=self._db_settings.password,
db=self._db_settings.database,
minsize=1,
maxsize=Environment.get("DB_POOL_SIZE", int, 1),
autocommit=True,
)
except Exception as e:
_logger.fatal("Failed to connect to the database", e)
raise
return self.pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
if multi:
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
for q in queries:
if q.strip() == "":
continue
await cursor.execute(q, args)
else:
await cursor.execute(query, args)
async def execute(self, query: str, args=None, multi=True) -> list[list]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
if cursor.description is not None: # Query returns rows
res = await cursor.fetchall()
if res is None:
return []
return [list(row) for row in res]
else:
return []
async def select(self, query: str, args=None, multi=True) -> list[str]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor(aiomysql.DictCursor) as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)

View File

@@ -0,0 +1,86 @@
import uuid
from typing import Any
from psycopg import OperationalError
from psycopg_pool import PoolTimeout
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.database_settings import DatabaseSettings
from cpl.database.db_logger import DBLogger
from cpl.database.postgres.postgres_pool import PostgresPool
_logger = DBLogger(__name__)
class DBContext(DBContextABC):
def __init__(self):
DBContextABC.__init__(self)
self._pool: PostgresPool = None
self._fails = 0
self.connect(Configuration.get(DatabaseSettings))
def connect(self, database_settings: DatabaseSettings):
try:
_logger.debug("Connecting to database")
self._pool = PostgresPool(
database_settings,
Environment.get("DB_POOL_SIZE", int, 1),
)
_logger.info("Connected to database")
except Exception as e:
_logger.fatal("Connecting to database failed", e)
async def execute(self, statement: str, args=None, multi=True) -> list[list]:
_logger.trace(f"execute {statement} with args: {args}")
return await self._pool.execute(statement, args, multi)
async def select_map(self, statement: str, args=None) -> list[dict]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select_map(statement, args)
except (OperationalError, PoolTimeout) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select_map(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e
async def select(self, statement: str, args=None) -> list[str] | list[tuple] | list[Any]:
_logger.trace(f"select {statement} with args: {args}")
try:
return await self._pool.select(statement, args)
except (OperationalError, PoolTimeout) as e:
if self._fails >= 3:
_logger.error(f"Database error caused by `{statement}`", e)
uid = uuid.uuid4()
raise Exception(
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
)
_logger.error(f"Database error caused by `{statement}`", e)
self._fails += 1
try:
_logger.debug("Retry select")
return await self.select(statement, args)
except Exception as e:
pass
return []
except Exception as e:
_logger.error(f"Database error caused by `{statement}`", e)
raise e

View File

@@ -0,0 +1,123 @@
from typing import Optional, Any
import sqlparse
from psycopg import sql
from psycopg_pool import AsyncConnectionPool, PoolTimeout
from cpl.core.environment import Environment
from cpl.database.db_logger import DBLogger
from cpl.database.model import DatabaseSettings
_logger = DBLogger(__name__)
class PostgresPool:
"""
Create a pool when connecting to PostgreSQL, which will decrease the time spent in
requesting connection, creating connection, and closing connection.
"""
def __init__(self, database_settings: DatabaseSettings):
self._conninfo = (
f"host={database_settings.host} "
f"port={database_settings.port} "
f"user={database_settings.user} "
f"password={database_settings.password} "
f"dbname={database_settings.database}"
)
self.pool: Optional[AsyncConnectionPool] = None
async def _get_pool(self):
pool = AsyncConnectionPool(
conninfo=self._conninfo, open=False, min_size=1, max_size=Environment.get("DB_POOL_SIZE", int, 1)
)
await pool.open()
try:
async with pool.connection() as con:
await pool.check_connection(con)
except PoolTimeout as e:
await pool.close()
_logger.fatal(f"Failed to connect to the database", e)
return pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
if multi:
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
for q in queries:
if q.strip() == "":
continue
await cursor.execute(sql.SQL(q), args)
else:
await cursor.execute(sql.SQL(query), args)
async def execute(self, query: str, args=None, multi=True) -> list[list]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
if cursor.description is not None: # Check if the query returns rows
res = await cursor.fetchall()
if res is None:
return []
result = []
for row in res:
result.append(list(row))
return result
else:
return []
async def select(self, query: str, args=None, multi=True) -> list[str]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
return list(res)
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in the psycopg module.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
async with await self._get_pool() as pool:
async with pool.connection() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
res_map: list[dict] = []
for i_res in range(len(res)):
cols = {}
for i_col in range(len(res[i_res])):
cols[cursor.description[i_col].name] = res[i_res][i_col]
res_map.append(cols)
return res_map

View File

@@ -0,0 +1,154 @@
from typing import Optional, Union
from cpl.database._external_data_temp_table_builder import ExternalDataTempTableBuilder
class SQLSelectBuilder:
def __init__(self, table_name: str, primary_key: str):
self._table_name = table_name
self._primary_key = primary_key
self._temp_tables: dict[str, ExternalDataTempTableBuilder] = {}
self._to_use_temp_tables: list[str] = []
self._attributes: list[str] = []
self._tables: list[str] = [table_name]
self._joins: dict[str, (str, str)] = {}
self._conditions: list[str] = []
self._order_by: str = ""
self._limit: Optional[int] = None
self._offset: Optional[int] = None
def with_temp_table(self, temp_table: ExternalDataTempTableBuilder) -> "SQLSelectBuilder":
self._temp_tables[temp_table.table_name] = temp_table
return self
def use_temp_table(self, temp_table_name: str):
if temp_table_name not in self._temp_tables:
raise ValueError(f"Temp table {temp_table_name} not found.")
self._to_use_temp_tables.append(temp_table_name)
def with_attribute(self, attr: str, ignore_table_name=False) -> "SQLSelectBuilder":
if not ignore_table_name and not attr.startswith(self._table_name):
attr = f"{self._table_name}.{attr}"
self._attributes.append(attr)
return self
def with_foreign_attribute(self, attr: str) -> "SQLSelectBuilder":
self._attributes.append(attr)
return self
def with_table(self, table_name: str) -> "SQLSelectBuilder":
self._tables.append(table_name)
return self
def _check_prefix(self, attr: str, foreign_tables: list[str]) -> str:
assert attr is not None
if "TO_CHAR" in attr:
return attr
valid_prefixes = [
"levenshtein",
self._table_name,
*self._joins.keys(),
*self._temp_tables.keys(),
*foreign_tables,
]
if not any(attr.startswith(f"{prefix}.") for prefix in valid_prefixes):
attr = f"{self._table_name}.{attr}"
return attr
def with_value_condition(
self, attr: str, operator: str, value: str, foreign_tables: list[str]
) -> "SQLSelectBuilder":
attr = self._check_prefix(attr, foreign_tables)
self._conditions.append(f"{attr} {operator} {value}")
return self
def with_levenshtein_condition(self, condition: str) -> "SQLSelectBuilder":
self._conditions.append(condition)
return self
def with_condition(self, attr: str, operator: str, foreign_tables: list[str]) -> "SQLSelectBuilder":
attr = self._check_prefix(attr, foreign_tables)
self._conditions.append(f"{attr} {operator}")
return self
def with_grouped_conditions(self, conditions: list[str]) -> "SQLSelectBuilder":
self._conditions.append(f"({' AND '.join(conditions)})")
return self
def with_left_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "LEFT")
self._joins[table] = (on, "LEFT")
return self
def with_inner_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "INNER")
self._joins[table] = (on, "INNER")
return self
def with_right_join(self, table: str, on: str) -> "SQLSelectBuilder":
if table in self._joins:
self._joins[table] = (f"{self._joins[table][0]} AND {on}", "RIGHT")
self._joins[table] = (on, "RIGHT")
return self
def with_limit(self, limit: int) -> "SQLSelectBuilder":
self._limit = limit
return self
def with_offset(self, offset: int) -> "SQLSelectBuilder":
self._offset = offset
return self
def with_order_by(self, column: Union[str, property], direction: str = "ASC") -> "SQLSelectBuilder":
if isinstance(column, property):
column = column.fget.__name__
self._order_by = f"{column} {direction}"
return self
async def _handle_temp_table_use(self, query) -> str:
new_query = ""
for temp_table_name in self._to_use_temp_tables:
temp_table = self._temp_tables[temp_table_name]
new_query += await self._temp_tables[temp_table_name].build()
self.with_left_join(
temp_table.table_name,
f"{temp_table.join_ref_table}.{self._primary_key} = {temp_table.table_name}.{temp_table.primary_key}",
)
return f"{new_query} {query}" if new_query != "" else query
async def build(self) -> str:
query = await self._handle_temp_table_use("")
attributes = ", ".join(self._attributes) if self._attributes else "*"
query += f"SELECT {attributes} FROM {", ".join(self._tables)}"
for join in self._joins:
query += f" {self._joins[join][1]} JOIN {join} ON {self._joins[join][0]}"
if self._conditions:
query += " WHERE " + " AND ".join(self._conditions)
if self._order_by:
query += f" ORDER BY {self._order_by}"
if self._limit is not None:
query += f" LIMIT {self._limit}"
if self._offset is not None:
query += f" OFFSET {self._offset}"
return query

View File

@@ -0,0 +1,18 @@
from datetime import datetime
from typing import Optional
from cpl.database.abc import DbModelABC
class ExecutedMigration(DbModelABC):
def __init__(
self,
migration_id: str,
created: Optional[datetime] = None,
modified: Optional[datetime] = None,
):
DbModelABC.__init__(self, migration_id, False, created, modified)
@property
def migration_id(self) -> str:
return self._id

View File

@@ -0,0 +1,14 @@
from cpl.database import InternalTables
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.database.db_logger import DBLogger
from cpl.database.schema.executed_migration import ExecutedMigration
_logger = DBLogger(__name__)
class ExecutedMigrationDao(DataAccessObjectABC[ExecutedMigration]):
def __init__(self):
DataAccessObjectABC.__init__(self, __name__, ExecutedMigration, InternalTables.executed_migrations)
self.attribute(ExecutedMigration.migration_id, str, primary_key=True, db_name="migrationId")

View File

@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS _executed_migrations
(
migrationId VARCHAR(255) PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

View File

@@ -0,0 +1,26 @@
DELIMITER //
CREATE TRIGGER mytable_before_update
BEFORE UPDATE
ON mytable
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
SET NEW.updated = NOW();
END;
//
DELIMITER ;
DELIMITER //
CREATE TRIGGER mytable_before_delete
BEFORE DELETE
ON mytable
FOR EACH ROW
BEGIN
INSERT INTO mytable_history
SELECT OLD.*;
END;
//
DELIMITER ;

View File

@@ -0,0 +1,47 @@
CREATE SCHEMA IF NOT EXISTS public;
CREATE SCHEMA IF NOT EXISTS system;
CREATE TABLE IF NOT EXISTS system._executed_migrations
(
MigrationId VARCHAR(255) PRIMARY KEY,
Created timestamptz NOT NULL DEFAULT NOW(),
Updated timestamptz NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION public.history_trigger_function()
RETURNS TRIGGER AS
$$
DECLARE
schema_name TEXT;
history_table_name TEXT;
BEGIN
-- Construct the name of the history table based on the current table
schema_name := TG_TABLE_SCHEMA;
history_table_name := TG_TABLE_NAME || '_history';
IF (TG_OP = 'INSERT') THEN
RETURN NEW;
END IF;
-- Insert the old row into the history table on UPDATE or DELETE
IF (TG_OP = 'UPDATE' OR TG_OP = 'DELETE') THEN
EXECUTE format(
'INSERT INTO %I.%I SELECT ($1).*',
schema_name,
history_table_name
)
USING OLD;
END IF;
-- For UPDATE, update the Updated column and return the new row
IF (TG_OP = 'UPDATE') THEN
NEW.updated := NOW(); -- Update the Updated column
RETURN NEW;
END IF;
-- For DELETE, return OLD to allow the deletion
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;

View File

@@ -0,0 +1,111 @@
import glob
import os
from cpl.database.abc import DBContextABC
from cpl.database.db_logger import DBLogger
from cpl.database.model import Migration
from cpl.database.model.server_type import ServerType, ServerTypes
from cpl.database.schema.executed_migration import ExecutedMigration
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
_logger = DBLogger(__name__)
class MigrationService:
def __init__(self, db: DBContextABC, executedMigrationDao: ExecutedMigrationDao):
self._db = db
self._executedMigrationDao = executedMigrationDao
self._script_directories: list[str] = []
if ServerType.server_type == ServerTypes.POSTGRES:
self.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../scripts/postgres"))
elif ServerType.server_type == ServerTypes.MYSQL:
self.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../scripts/mysql"))
def with_directory(self, directory: str) -> "MigrationService":
self._script_directories.append(directory)
return self
async def _get_migration_history(self) -> list[ExecutedMigration]:
results = await self._db.select(f"SELECT * FROM {self._executedMigrationDao.table_name}")
applied_migrations = []
for result in results:
applied_migrations.append(ExecutedMigration(result[0]))
return applied_migrations
@staticmethod
def _load_scripts_by_path(path: str) -> list[Migration]:
migrations = []
if not os.path.exists(path):
raise Exception("Migration path not found")
files = sorted(glob.glob(f"{path}/*"))
for file in files:
if not file.endswith(".sql"):
continue
name = str(file.split(".sql")[0])
if "/" in name:
name = name.split("/")[-1]
with open(f"{file}", "r") as f:
script = f.read()
f.close()
migrations.append(Migration(name, script))
return migrations
def _load_scripts(self) -> list[Migration]:
migrations = []
for path in self._script_directories:
migrations.extend(self._load_scripts_by_path(path))
return migrations
async def _get_tables(self):
if ServerType == ServerTypes.POSTGRES:
return await self._db.select(
"""
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public';
"""
)
else:
return await self._db.select(
"""
SHOW TABLES;
"""
)
async def _execute(self, migrations: list[Migration]):
result = await self._get_tables()
for migration in migrations:
active_statement = ""
try:
# check if table exists
if len(result) > 0:
migration_from_db = await self._executedMigrationDao.find_by_id(migration.name)
if migration_from_db is not None:
continue
_logger.debug(f"Running upgrade migration: {migration.name}")
await self._db.execute(migration.script, multi=True)
await self._executedMigrationDao.create(ExecutedMigration(migration.name), skip_editor=True)
except Exception as e:
_logger.fatal(
f"Migration failed: {migration.name}\n{active_statement}",
e,
)
async def migrate(self):
await self._execute(self._load_scripts())

View File

@@ -0,0 +1,65 @@
from datetime import datetime
from typing import TypeVar, Union, Literal, Any
from cpl.database.abc.db_model_abc import DbModelABC
T_DBM = TypeVar("T_DBM", bound=DbModelABC)
NumberFilterOperator = Literal[
"equal",
"notEqual",
"greater",
"greaterOrEqual",
"less",
"lessOrEqual",
"isNull",
"isNotNull",
]
StringFilterOperator = Literal[
"equal",
"notEqual",
"contains",
"notContains",
"startsWith",
"endsWith",
"isNull",
"isNotNull",
]
BoolFilterOperator = Literal[
"equal",
"notEqual",
"isNull",
"isNotNull",
]
DateFilterOperator = Literal[
"equal",
"notEqual",
"greater",
"greaterOrEqual",
"less",
"lessOrEqual",
"isNull",
"isNotNull",
]
FilterOperator = Union[NumberFilterOperator, StringFilterOperator, BoolFilterOperator, DateFilterOperator]
Attribute = Union[str, property]
AttributeCondition = Union[
dict[NumberFilterOperator, int],
dict[StringFilterOperator, str],
dict[BoolFilterOperator, bool],
dict[DateFilterOperator, datetime],
]
AttributeFilter = dict[Attribute, Union[list[Union[AttributeCondition, Any]], AttributeCondition, Any]]
AttributeFilters = Union[
list[AttributeFilter],
AttributeFilter,
]
AttributeSort = dict[Attribute, Literal["asc", "desc"]]
AttributeSorts = Union[
list[AttributeSort],
AttributeSort,
]

View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-database"
version = "2024.7.0"
description = "CPL database"
readme ="CPL database package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "database", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -0,0 +1 @@
black==25.1.0

View File

@@ -0,0 +1,8 @@
cpl-core
cpl-dependency
psycopg[binary]==3.2.3
psycopg-pool==3.2.4
sqlparse==0.5.3
mysql-connector-python==9.4.0
async-property==0.2.2
aiomysql==0.2.0

View File

@@ -0,0 +1,7 @@
from .scope import Scope
from .scope_abc import ScopeABC
from .service_collection import ServiceCollection
from .service_descriptor import ServiceDescriptor
from .service_lifetime_enum import ServiceLifetimeEnum
from .service_provider import ServiceProvider
from .service_provider_abc import ServiceProviderABC

View File

@@ -1,6 +1,5 @@
from cpl_core.console.console import Console
from cpl_core.dependency_injection.scope_abc import ScopeABC
from cpl_core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl.dependency.scope_abc import ScopeABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
class Scope(ScopeABC):

View File

@@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
class ScopeABC(ABC):
r"""ABC for the class :class:`cpl_core.dependency_injection.scope.Scope`"""
r"""ABC for the class :class:`cpl.dependency.scope.Scope`"""
def __init__(self):
pass
@@ -13,11 +13,9 @@ class ScopeABC(ABC):
r"""Returns to service provider of scope
Returns:
Object of type :class:`cpl_core.dependency_injection.service_provider_abc.ServiceProviderABC`
Object of type :class:`cpl.dependency.service_provider_abc.ServiceProviderABC`
"""
pass
@abstractmethod
def dispose(self):
r"""Sets service_provider to None"""
pass

View File

@@ -0,0 +1,18 @@
from cpl.dependency.scope import Scope
from cpl.dependency.scope_abc import ScopeABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
class ScopeBuilder:
r"""Class to build :class:`cpl.dependency.scope.Scope`"""
def __init__(self, service_provider: ServiceProviderABC) -> None:
self._service_provider = service_provider
def build(self) -> ScopeABC:
r"""Returns scope
Returns:
Object of type :class:`cpl.dependency.scope.Scope`
"""
return Scope(self._service_provider)

View File

@@ -1,27 +1,26 @@
from typing import Union, Type, Callable, Optional
from cpl_core.configuration.configuration_abc import ConfigurationABC
from cpl_core.database.context.database_context_abc import DatabaseContextABC
from cpl_core.database.database_settings import DatabaseSettings
from cpl_core.dependency_injection.service_collection_abc import ServiceCollectionABC
from cpl_core.dependency_injection.service_descriptor import ServiceDescriptor
from cpl_core.dependency_injection.service_lifetime_enum import ServiceLifetimeEnum
from cpl_core.dependency_injection.service_provider import ServiceProvider
from cpl_core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl_core.logging.logger_abc import LoggerABC
from cpl_core.logging.logger_service import Logger
from cpl_core.pipes.pipe_abc import PipeABC
from cpl_core.type import T
from cpl.core.log.logger import Logger
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.pipes.pipe_abc import PipeABC
from cpl.core.typing import T, Service
from cpl.dependency.service_descriptor import ServiceDescriptor
from cpl.dependency.service_lifetime_enum import ServiceLifetimeEnum
from cpl.dependency.service_provider import ServiceProvider
from cpl.dependency.service_provider_abc import ServiceProviderABC
class ServiceCollection(ServiceCollectionABC):
class ServiceCollection:
r"""Representation of the collection of services"""
def __init__(self, config: ConfigurationABC):
ServiceCollectionABC.__init__(self)
self._configuration: ConfigurationABC = config
_modules: dict[str, Callable] = {}
self._database_context: Optional[DatabaseContextABC] = None
@classmethod
def with_module(cls, func: Callable, name: str = None):
cls._modules[func.__name__ if name is None else name] = func
return cls
def __init__(self):
self._service_descriptors: list[ServiceDescriptor] = []
def _add_descriptor(self, service: Union[type, object], lifetime: ServiceLifetimeEnum, base_type: Callable = None):
@@ -47,13 +46,22 @@ class ServiceCollection(ServiceCollectionABC):
return self
def add_db_context(self, db_context_type: Type[DatabaseContextABC], db_settings: DatabaseSettings):
self.add_singleton(DatabaseContextABC, db_context_type)
self._database_context = self.build_service_provider().get_service(DatabaseContextABC)
self._database_context.connect(db_settings)
def add_module(self, module: str | object):
if not isinstance(module, str):
module = module.__name__
if module not in self._modules:
raise ValueError(f"Module {module} not found")
self._modules[module](self)
# def add_mysql(self, db_context_type: Type[DatabaseContextABC], db_settings: DatabaseSettings):
# self.add_singleton(DatabaseContextABC, db_context_type)
# self._database_context = self.build_service_provider().get_service(DatabaseContextABC)
# self._database_context.connect(db_settings)
def add_logging(self):
self.add_singleton(LoggerABC, Logger)
self.add_transient(LoggerABC, Logger)
return self
def add_pipes(self):
@@ -61,19 +69,19 @@ class ServiceCollection(ServiceCollectionABC):
self.add_transient(PipeABC, pipe)
return self
def add_singleton(self, service_type: T, service: T = None):
def add_singleton(self, service_type: T, service: Service = None):
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.singleton, service)
return self
def add_scoped(self, service_type: T, service: T = None):
def add_scoped(self, service_type: T, service: Service = None):
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.scoped, service)
return self
def add_transient(self, service_type: T, service: T = None):
def add_transient(self, service_type: T, service: Service = None):
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.transient, service)
return self
def build_service_provider(self) -> ServiceProviderABC:
sp = ServiceProvider(self._service_descriptors, self._configuration, self._database_context)
sp = ServiceProvider(self._service_descriptors)
ServiceProviderABC.set_global_provider(sp)
return sp

View File

@@ -1,7 +1,6 @@
from typing import Union, Optional
from cpl_core.console import Console
from cpl_core.dependency_injection.service_lifetime_enum import ServiceLifetimeEnum
from cpl.dependency.service_lifetime_enum import ServiceLifetimeEnum
class ServiceDescriptor:
@@ -10,7 +9,7 @@ class ServiceDescriptor:
Parameter:
implementation: Union[:class:`type`, Optional[:class:`object`]]
Object or type of service
lifetime: :class:`cpl_core.dependency_injection.service_lifetime_enum.ServiceLifetimeEnum`
lifetime: :class:`cpl.dependency.service_lifetime_enum.ServiceLifetimeEnum`
Lifetime of the service
"""

Some files were not shown because too many files have changed in this diff Show More