Compare commits

..

2 Commits

Author SHA1 Message Date
b6cf5962aa Merge pull request 'Removed tools & docs' (#197) from dev_cleanup into master
Reviewed-on: #197
2025-10-08 21:30:47 +02:00
d3084041a9 Removed tools & docs 2025-10-08 21:30:01 +02:00
707 changed files with 15036 additions and 8907 deletions

View File

@@ -1,76 +0,0 @@
name: Build on push
run-name: Build on push
on:
push:
branches:
- dev
jobs:
prepare:
uses: ./.gitea/workflows/prepare.yaml
with:
version_suffix: 'dev'
secrets: inherit
api:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, application, auth, core, dependency ]
with:
working_directory: src/cpl-api
secrets: inherit
application:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-application
secrets: inherit
auth:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency, database ]
with:
working_directory: src/cpl-auth
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-core
secrets: inherit
database:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-database
secrets: inherit
dependency:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-dependency
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-mail
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/cpl-translation
secrets: inherit

View File

@@ -1,39 +0,0 @@
name: Build on push
run-name: Build on push
on:
push:
branches:
- master
jobs:
prepare:
uses: ./.gitea/workflows/prepare.yaml
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-core
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-translation
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cpl-mail
secrets: inherit

View File

@@ -1,65 +0,0 @@
name: Build Package
run-name: Build Python Package
on:
workflow_call:
inputs:
version_suffix:
description: 'Suffix for version (z.B. "dev", "alpha", "beta")'
required: false
type: string
working_directory:
required: true
type: string
jobs:
build:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
defaults:
run:
working-directory: ${{ inputs.working_directory }}
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Download build version artifact
uses: actions/download-artifact@v3
with:
name: version
- name: Set version
run: |
sed -i -E "s/^version = \".*\"/version = \"$(cat /workspace/sh-edraft.de/cpl/version.txt)\"/" pyproject.toml
echo "Set version to $(cat /workspace/sh-edraft.de/cpl/version.txt)"
cat pyproject.toml
- name: Set pip conf
run: |
cat > .pip.conf <<'EOF'
[global]
extra-index-url = https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/
EOF
- name: Install Dependencies
run: |
export PIP_CONFIG_FILE=".pip.conf"
pip install build
- name: Build Package
run: |
python -m build --outdir dist
- name: Login to registry git.sh-edraft.de
uses: https://github.com/docker/login-action@v1
with:
registry: git.sh-edraft.de
username: ${{ secrets.CI_USERNAME }}
password: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Push image
run: |
pip install twine
python -m twine upload --repository-url https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi -u ${{ secrets.CI_USERNAME }} -p ${{ secrets.CI_ACCESS_TOKEN }} ./dist/*

View File

@@ -1,54 +0,0 @@
name: Prepare Build
run-name: Prepare Build Version
on:
workflow_call:
inputs:
version_suffix:
description: 'Suffix for version (z.B. "dev", "alpha", "beta")'
required: false
type: string
jobs:
prepare:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Get Date and Build Number
run: |
git fetch --tags
git tag
DATE=$(date +'%Y.%m.%d')
TAG_COUNT=$(git tag -l "${DATE}.*" | wc -l)
BUILD_NUMBER=$(($TAG_COUNT + 1))
VERSION_SUFFIX=${{ inputs.version_suffix }}
if [ -n "$VERSION_SUFFIX" ] && [ "$VERSION_SUFFIX" = "dev" ]; then
BUILD_VERSION="${DATE}.dev${BUILD_NUMBER}"
elif [ -n "$VERSION_SUFFIX" ]; then
BUILD_VERSION="${DATE}.${BUILD_NUMBER}${VERSION_SUFFIX}"
else
BUILD_VERSION="${DATE}.${BUILD_NUMBER}"
fi
echo "$BUILD_VERSION" > version.txt
echo "VERSION $BUILD_VERSION"
- name: Create Git Tag for Build
run: |
git config user.name "ci"
git config user.email "dev@sh-edraft.de"
echo "tag $(cat version.txt)"
git tag $(cat version.txt)
git push origin --tags
- name: Upload build version artifact
uses: actions/upload-artifact@v3
with:
name: version
path: version.txt

View File

@@ -1,26 +0,0 @@
name: Test before pr merge
run-name: Test before pr merge
on:
pull_request:
types:
- opened
- edited
- reopened
- synchronize
- ready_for_review
jobs:
test-lint:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Installing black
run: python3.12 -m pip install black
- name: Checking black
run: python3.12 -m black src --check

4
.gitignore vendored
View File

@@ -113,7 +113,6 @@ venv.bak/
# Custom Environments
cpl-env/
.secret
# Spyder project settings
.spyderproject
@@ -139,6 +138,3 @@ PythonImportHelper-v2-Completion.json
# cpl unittest stuff
unittests/test_*_playground
# cpl logs
**/logs/*.jsonl

View File

@@ -1,2 +0,0 @@
[global]
extra-index-url = https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/

153
README.md
View File

@@ -0,0 +1,153 @@
<h1 align="center">CPL - Common python library</h1>
<!-- Summary -->
<p align="center">
<!-- <img src="" alt="cpl-logo" width="120px" height="120px"/> -->
<br>
<i>
CPL is a development platform for python server applications
<br>using Python.</i>
<br>
</p>
## Table of Contents
<!-- TABLE OF CONTENTS -->
<ol>
<li><a href="#Features">Features</a></li>
<li>
<a href="#getting-started">Getting Started</a>
<ul>
<li><a href="#prerequisites">Prerequisites</a></li>
<li><a href="#installation">Installation</a></li>
</ul>
</li>
<li><a href="#roadmap">Roadmap</a></li>
<li><a href="#contributing">Contributing</a></li>
<li><a href="#license">License</a></li>
<li><a href="#contact">Contact</a></li>
</ol>
## Features
<!-- FEATURE OVERVIEW -->
- Expandle
- Application base
- Standardized application classes
- Application object builder
- Application extension classes
- Startup classes
- Startup extension classes
- Configuration
- Configure via object mapped JSON
- Console argument handling
- Console class for in and output
- Banner
- Spinner
- Options (menu)
- Table
- Write
- Write_at
- Write_line
- Write_line_at
- Dependency injection
- Service lifetimes: singleton, scoped and transient
- Providing of application environment
- Environment (development, staging, testing, production)
- Appname
- Customer
- Hostname
- Runtime directory
- Working directory
- Logging
- Standardized logger
- Log-level (FATAL, ERROR, WARN, INFO, DEBUG & TRACE)
- Mail handling
- Send mails
- Pipe classes
- Convert input
- Utils
- Credential manager
- Encryption via BASE64
- PIP wrapper class based on subprocess
- Run pip commands
- String converter to different variants
- to_lower_case
- to_camel_case
- ...
<!-- GETTING STARTED -->
## Getting Started
[Get started with CPL][quickstart].
### Prerequisites
- Install [python] which includes [Pip installs packages][pip]
### Installation
Install the CPL package
```sh
pip install cpl-core --extra-index-url https://pip.sh-edraft.de
```
Install the CPL CLI
```sh
pip install cpl-cli --extra-index-url https://pip.sh-edraft.de
```
Create workspace:
```sh
cpl new <console|library|unittest> <PROJECT NAME>
```
Run the application:
```sh
cd <PROJECT NAME>
cpl start
```
<!-- ROADMAP -->
## Roadmap
See the [open issues](https://git.sh-edraft.de/sh-edraft.de/sh_cpl/issues) for a list of proposed features (and known issues).
<!-- CONTRIBUTING -->
## Contributing
### Contributing Guidelines
Read through our [contributing guidelines][contributing] to learn about our submission process, coding rules and more.
### Want to Help?
Want to file a bug, contribute some code, or improve documentation? Excellent! Read up on our guidelines for [contributing][contributing].
<!-- LICENSE -->
## License
Distributed under the MIT License. See [LICENSE] for more information.
<!-- CONTACT -->
## Contact
Sven Heidemann - sven.heidemann@sh-edraft.de
Project link: [https://git.sh-edraft.de/sh-edraft.de/sh_common_py_lib](https://git.sh-edraft.de/sh-edraft.de/sh_cpl)
<!-- External LINKS -->
[pip_url]: https://pip.sh-edraft.de
[python]: https://www.python.org/
[pip]: https://pypi.org/project/pip/
<!-- Internal LINKS -->
[project]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl
[quickstart]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/quickstart
[contributing]: https://git.sh-edraft.de/sh-edraft.de/sh_cpl/wiki/contributing
[license]: LICENSE

151
cpl-workspace.json Normal file
View File

@@ -0,0 +1,151 @@
{
"WorkspaceSettings": {
"DefaultProject": "cpl-core",
"Projects": {
"cpl-cli": "src/cpl_cli/cpl-cli.json",
"cpl-core": "src/cpl_core/cpl-core.json",
"cpl-discord": "src/cpl_discord/cpl-discord.json",
"cpl-query": "src/cpl_query/cpl-query.json",
"cpl-translation": "src/cpl_translation/cpl-translation.json",
"set-version": "tools/set_version/set-version.json",
"set-pip-urls": "tools/set_pip_urls/set-pip-urls.json",
"unittests": "unittests/unittests/unittests.json",
"unittests_cli": "unittests/unittests_cli/unittests_cli.json",
"unittests_core": "unittests/unittests_core/unittests_core.json",
"unittests_query": "unittests/unittests_query/unittests_query.json",
"unittests_shared": "unittests/unittests_shared/unittests_shared.json",
"unittests_translation": "unittests/unittests_translation/unittests_translation.json"
},
"Scripts": {
"hello-world": "echo 'Hello World'",
"format": "echo 'Formatting:'; black ./",
"sv": "cpl set-version",
"set-version": "cpl run set-version --dev $ARGS; echo '';",
"spu": "cpl set-pip-urls",
"set-pip-urls": "cpl run set-pip-urls --dev $ARGS; echo '';",
"docs-build": "cpl format; echo 'Build Documentation'; cpl db-core; cpl db-discord; cpl db-query; cpl db-translation; cd docs/; make clean; make html;",
"db-core": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_core; cd ../",
"db-discord": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_discord; cd ../",
"db-query": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_query; cd ../",
"db-translation": "cd docs/; sphinx-apidoc -o source/ ../src/cpl_translation; cd ../",
"db": "cpl docs-build",
"docs-open": "xdg-open $PWD/docs/build/html/index.html &",
"do": "cpl docs-open",
"test": "cpl run unittests",
"pre-build-all": "cpl sv $ARGS; cpl spu $ARGS;",
"build-all": "cpl build-cli; cpl build-core; cpl build-discord; cpl build-query; cpl build-translation; cpl build-set-pip-urls; cpl build-set-version",
"ba": "cpl build-all $ARGS",
"build-cli": "echo 'Build cpl-cli'; cd ./src/cpl_cli; cpl build; cd ../../;",
"build-core": "echo 'Build cpl-core'; cd ./src/cpl_core; cpl build; cd ../../;",
"build-discord": "echo 'Build cpl-discord'; cd ./src/cpl_discord; cpl build; cd ../../;",
"build-query": "echo 'Build cpl-query'; cd ./src/cpl_query; cpl build; cd ../../;",
"build-translation": "echo 'Build cpl-translation'; cd ./src/cpl_translation; cpl build; cd ../../;",
"build-set-pip-urls": "echo 'Build set-pip-urls'; cd ./tools/set_pip_urls; cpl build; cd ../../;",
"build-set-version": "echo 'Build set-version'; cd ./tools/set_version; cpl build; cd ../../;",
"pre-publish-all": "cpl sv $ARGS; cpl spu $ARGS;",
"publish-all": "cpl publish-cli; cpl publish-core; cpl publish-discord; cpl publish-query; cpl publish-translation;",
"pa": "cpl publish-all $ARGS",
"publish-cli": "echo 'Publish cpl-cli'; cd ./src/cpl_cli; cpl publish; cd ../../;",
"publish-core": "echo 'Publish cpl-core'; cd ./src/cpl_core; cpl publish; cd ../../;",
"publish-discord": "echo 'Publish cpl-discord'; cd ./src/cpl_discord; cpl publish; cd ../../;",
"publish-query": "echo 'Publish cpl-query'; cd ./src/cpl_query; cpl publish; cd ../../;",
"publish-translation": "echo 'Publish cpl-translation'; cd ./src/cpl_translation; cpl publish; cd ../../;",
"upload-prod-cli": "echo 'PROD Upload cpl-cli'; cpl upl-prod-cli;",
"upl-prod-cli": "twine upload -r pip.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-prod-core": "echo 'PROD Upload cpl-core'; cpl upl-prod-core;",
"upl-prod-core": "twine upload -r pip.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-prod-discord": "echo 'PROD Upload cpl-discord'; cpl upl-prod-discord;",
"upl-prod-discord": "twine upload -r pip.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-prod-query": "echo 'PROD Upload cpl-query'; cpl upl-prod-query;",
"upl-prod-query": "twine upload -r pip.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-prod-translation": "echo 'PROD Upload cpl-translation'; cpl upl-prod-translation;",
"upl-prod-translation": "twine upload -r pip.sh-edraft.de dist/cpl-translation/publish/setup/*",
"upload-exp-cli": "echo 'EXP Upload cpl-cli'; cpl upl-exp-cli;",
"upl-exp-cli": "twine upload -r pip-exp.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-exp-core": "echo 'EXP Upload cpl-core'; cpl upl-exp-core;",
"upl-exp-core": "twine upload -r pip-exp.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-exp-discord": "echo 'EXP Upload cpl-discord'; cpl upl-exp-discord;",
"upl-exp-discord": "twine upload -r pip-exp.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-exp-query": "echo 'EXP Upload cpl-query'; cpl upl-exp-query;",
"upl-exp-query": "twine upload -r pip-exp.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-exp-translation": "echo 'EXP Upload cpl-translation'; cpl upl-exp-translation;",
"upl-exp-translation": "twine upload -r pip-exp.sh-edraft.de dist/cpl-translation/publish/setup/*",
"upload-dev-cli": "echo 'DEV Upload cpl-cli'; cpl upl-dev-cli;",
"upl-dev-cli": "twine upload -r pip-dev.sh-edraft.de dist/cpl-cli/publish/setup/*",
"upload-dev-core": "echo 'DEV Upload cpl-core'; cpl upl-dev-core;",
"upl-dev-core": "twine upload -r pip-dev.sh-edraft.de dist/cpl-core/publish/setup/*",
"upload-dev-discord": "echo 'DEV Upload cpl-discord'; cpl upl-dev-discord;",
"upl-dev-discord": "twine upload -r pip-dev.sh-edraft.de dist/cpl-discord/publish/setup/*",
"upload-dev-query": "echo 'DEV Upload cpl-query'; cpl upl-dev-query;",
"upl-dev-query": "twine upload -r pip-dev.sh-edraft.de dist/cpl-query/publish/setup/*",
"upload-dev-translation": "echo 'DEV Upload cpl-translation'; cpl upl-dev-translation;",
"upl-dev-translation": "twine upload -r pip-dev.sh-edraft.de dist/cpl-translation/publish/setup/*",
"pre-deploy-prod": "cpl sv $ARGS; cpl spu --environment=production;",
"deploy-prod": "cpl deploy-prod-cli; cpl deploy-prod-core; cpl deploy-prod-discord; cpl deploy-prod-query; cpl deploy-prod-translation;",
"dp": "cpl deploy-prod $ARGS",
"deploy-prod-cli": "cpl publish-cli; cpl upload-prod-cli",
"deploy-prod-core": "cpl publish-core; cpl upload-prod-core",
"deploy-prod-query": "cpl publish-query; cpl upload-prod-query",
"deploy-prod-discord": "cpl publish-discord; cpl upload-prod-discord",
"deploy-prod-translation": "cpl publish-translation; cpl upload-prod-translation",
"pre-deploy-exp": "cpl sv $ARGS; cpl spu --environment=staging;",
"deploy-exp": "cpl deploy-exp-cli; cpl deploy-exp-core; cpl deploy-exp-discord; cpl deploy-exp-query; cpl deploy-exp-translation;",
"de": "cpl deploy-exp $ARGS",
"deploy-exp-cli": "cpl publish-cli; cpl upload-exp-cli",
"deploy-exp-core": "cpl publish-core; cpl upload-exp-core",
"deploy-exp-discord": "cpl publish-discord; cpl upload-exp-discord",
"deploy-exp-query": "cpl publish-query; cpl upload-exp-query",
"deploy-exp-translation": "cpl publish-translation; cpl upload-exp-translation",
"pre-deploy-dev": "cpl sv $ARGS; cpl spu --environment=development;",
"deploy-dev": "cpl deploy-dev-cli; cpl deploy-dev-core; cpl deploy-dev-discord; cpl deploy-dev-query; cpl deploy-dev-translation;",
"dd": "cpl deploy-dev $ARGS",
"deploy-dev-cli": "cpl publish-cli; cpl upload-dev-cli",
"deploy-dev-core": "cpl publish-core; cpl upload-dev-core",
"deploy-dev-discord": "cpl publish-discord; cpl upload-dev-discord",
"deploy-dev-query": "cpl publish-query; cpl upload-dev-query",
"deploy-dev-translation": "cpl publish-query; cpl upload-dev-translation",
"dev-install": "cpl di-core; cpl di-cli; cpl di-query; cpl di-translation;",
"di": "cpl dev-install",
"di-core": "pip install cpl-core --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-cli": "pip install cpl-cli --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-discord": "pip install cpl-discord --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-query": "pip install cpl-query --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"di-translation": "pip install cpl-translation --pre --upgrade --extra-index-url https://pip-dev.sh-edraft.de",
"prod-install": "cpl pi-core; cpl pi-cli; cpl pi-query; cpl pi-translation;",
"pi": "cpl prod-install",
"pi-core": "pip install cpl-core --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-cli": "pip install cpl-cli --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-discord": "pip install cpl-discord --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-query": "pip install cpl-query --pre --upgrade --extra-index-url https://pip.sh-edraft.de",
"pi-translation": "pip install cpl-translation --pre --upgrade --extra-index-url https://pip.sh-edraft.de"
}
}
}

View File

@@ -1,8 +0,0 @@
{
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "TRACE",
"Level": "TRACE"
}
}

View File

@@ -1,26 +0,0 @@
{
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"Log": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"Database": {
"Host": "localhost",
"User": "cpl",
"Port": 3306,
"Password": "cpl",
"Database": "cpl",
"Charset": "utf8mb4",
"UseUnicode": "true",
"Buffered": "true"
}
}

View File

@@ -1,47 +0,0 @@
from starlette.responses import JSONResponse
from cpl import api
from cpl.api.application.web_app import WebApp
from cpl.application import ApplicationBuilder
from cpl.auth.permission.permissions import Permissions
from cpl.auth.schema import AuthUser, Role
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.core.utils.cache import Cache
from service import PingService
def main():
builder = ApplicationBuilder[WebApp](WebApp)
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
# builder.services.add_logging()
builder.services.add_structured_logging()
builder.services.add_transient(PingService)
builder.services.add_module(api)
builder.services.add_cache(AuthUser)
builder.services.add_cache(Role)
app = builder.build()
app.with_logging()
app.with_database()
app.with_authentication()
app.with_authorization()
app.with_route(path="/route1", fn=lambda r: JSONResponse("route1"), method="GET", authentication=True, permissions=[Permissions.administrator])
app.with_routes_directory("routes")
provider = builder.service_provider
user_cache = provider.get_service(Cache[AuthUser])
role_cache = provider.get_service(Cache[Role])
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,16 +0,0 @@
from urllib.request import Request
from service import PingService
from starlette.responses import JSONResponse
from cpl.api import APILogger
from cpl.api.router import Router
@Router.authenticate()
# @Router.authorize(permissions=[Permissions.administrator])
# @Router.authorize(policies=["test"])
@Router.get(f"/ping")
async def ping(r: Request, ping: PingService, logger: APILogger):
logger.info(f"Ping: {ping}")
return JSONResponse(ping.ping(r))

View File

@@ -1,4 +0,0 @@
class PingService:
def ping(self, r):
return "pong"

View File

@@ -1,39 +0,0 @@
from cpl.application.abc import ApplicationABC
from cpl.auth.keycloak import KeycloakAdmin
from cpl.core.console import Console
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.dependency import ServiceProviderABC
from model.city import City
from model.city_dao import CityDao
from model.user import User
from model.user_dao import UserDao
class Application(ApplicationABC):
def __init__(self, services: ServiceProviderABC):
ApplicationABC.__init__(self, services)
self._logger = services.get_service(LoggerABC)
async def test_daos(self):
userDao: UserDao = self._services.get_service(UserDao)
cityDao: CityDao = self._services.get_service(CityDao)
Console.write_line(await userDao.get_all())
if len(await cityDao.get_all()) == 0:
city_id = await cityDao.create(City(0, "Haren", "49733"))
await userDao.create(User(0, "NewUser", city_id))
Console.write_line(await userDao.get_all())
async def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")
await self.test_daos()
kc_admin: KeycloakAdmin = self._services.get_service(KeycloakAdmin)
x = kc_admin.get_users()
Console.write_line(x)

View File

@@ -1,8 +0,0 @@
{
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "TRACE",
"Level": "TRACE"
}
}

View File

@@ -1,5 +0,0 @@
from enum import Enum
class CustomPermissions(Enum):
test = "test"

View File

@@ -1,19 +0,0 @@
from application import Application
from cpl.application import ApplicationBuilder
from custom_permissions import CustomPermissions
from startup import Startup
def main():
builder = ApplicationBuilder(Application).with_startup(Startup)
app = builder.build()
app.with_logging()
app.with_permissions(CustomPermissions)
app.with_migrations("./scripts")
app.with_seeders()
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,27 +0,0 @@
from application import Application
from cpl.application import ApplicationBuilder
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.core.console import Console
from cpl.core.log import LogLevel
from custom_permissions import CustomPermissions
from startup import Startup
def main():
builder = ApplicationBuilder(Application).with_startup(Startup)
builder.services.add_logging()
app = builder.build()
app.with_logging(LogLevel.trace)
app.with_permissions(CustomPermissions)
app.with_migrations("./scripts")
app.with_seeders()
Console.write_line(CustomPermissions.test.value in PermissionsRegistry.get())
app.run()
Console.write_line("Hello from main_simplified.py!")
if __name__ == "__main__":
main()

View File

@@ -1,29 +0,0 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import SerialId
from cpl.database.abc.db_model_abc import DbModelABC
class City(DbModelABC):
def __init__(
self,
id: int,
name: str,
zip: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._zip = zip
@property
def name(self) -> str:
return self._name
@property
def zip(self) -> str:
return self._zip

View File

@@ -1,11 +0,0 @@
from cpl.database.abc import DbModelDaoABC
from model.city import City
class CityDao(DbModelDaoABC[City]):
def __init__(self):
DbModelDaoABC.__init__(self, City, "city")
self.attribute(City.name, str)
self.attribute(City.zip, int)

View File

@@ -1,30 +0,0 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import SerialId
from cpl.database.abc.db_model_abc import DbModelABC
class User(DbModelABC):
def __init__(
self,
id: int,
name: str,
city_id: int = 0,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._city_id = city_id
@property
def name(self) -> str:
return self._name
@property
def city_id(self) -> int:
return self._city_id

View File

@@ -1,13 +0,0 @@
from cpl.database.abc import DbModelDaoABC
from model.user import User
class UserDao(DbModelDaoABC[User]):
def __init__(self):
DbModelDaoABC.__init__(self, User, "users")
self.attribute(User.name, str)
self.attribute(User.city_id, int, db_name="CityId")
self.reference("city", "id", User.city_id, "city")

View File

@@ -1,22 +0,0 @@
CREATE TABLE IF NOT EXISTS `city` (
`id` INT(30) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(64) NOT NULL,
`zip` VARCHAR(5) NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`)
);
CREATE TABLE IF NOT EXISTS `users` (
`id` INT(30) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(64) NOT NULL,
`cityId` INT(30),
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (`cityId`) REFERENCES city(`id`),
PRIMARY KEY(`id`)
);

View File

@@ -1,31 +0,0 @@
from cpl import auth
from cpl.application.abc.startup_abc import StartupABC
from cpl.auth import permission
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.core.log import Logger, LoggerABC
from cpl.database import mysql
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
from cpl.dependency import ServiceCollection
from model.city_dao import CityDao
from model.user_dao import UserDao
class Startup(StartupABC):
@staticmethod
async def configure_configuration():
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
@staticmethod
async def configure_services(services: ServiceCollection):
services.add_module(mysql)
services.add_module(auth)
services.add_module(permission)
services.add_transient(DataAccessObjectABC, UserDao)
services.add_transient(DataAccessObjectABC, CityDao)
services.add_singleton(LoggerABC, Logger)

View File

@@ -1,25 +0,0 @@
from cpl.application.abc import StartupABC
from cpl.dependency import ServiceProviderABC, ServiceCollection
from di.di_tester_service import DITesterService
from di.test1_service import Test1Service
from di.test2_service import Test2Service
from di.test_abc import TestABC
from di.test_service import TestService
from di.tester import Tester
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
def configure_configuration(self): ...
def configure_services(self, services: ServiceCollection) -> ServiceProviderABC:
services.add_scoped(TestService)
services.add_scoped(DITesterService)
services.add_singleton(TestABC, Test1Service)
services.add_singleton(TestABC, Test2Service)
services.add_singleton(Tester)
return services.build()

View File

@@ -1,9 +0,0 @@
from cpl.dependency import ServiceProvider, ServiceProviderABC
from di.test_service import TestService
class StaticTest:
@staticmethod
@ServiceProvider.inject
def test(services: ServiceProviderABC, t1: TestService):
t1.run()

View File

@@ -1,12 +0,0 @@
import string
from cpl.core.console.console import Console
from cpl.core.utils.string import String
class TestService:
def __init__(self):
self._name = String.random(8)
def run(self):
Console.write_line(f"Im {self._name}")

View File

@@ -1,64 +0,0 @@
import time
from typing import Optional
from cpl.application.abc import ApplicationABC
from cpl.core.configuration import Configuration
from cpl.core.console import Console
from cpl.dependency import ServiceProviderABC
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.core.pipes import IPAddressPipe
from cpl.mail import EMail, EMailClientABC
from cpl.query.extension.list import List
from test_service import TestService
from test_settings import TestSettings
class Application(ApplicationABC):
def __init__(self, services: ServiceProviderABC):
ApplicationABC.__init__(self, services)
self._logger = self._services.get_service(LoggerABC)
self._mailer = self._services.get_service(EMailClientABC)
def test_send_mail(self):
mail = EMail()
mail.add_header("Mime-Version: 1.0")
mail.add_header("Content-Type: text/plain; charset=utf-8")
mail.add_header("Content-Transfer-Encoding: quoted-printable")
mail.add_receiver("sven.heidemann@sh-edraft.de")
mail.subject = f"Test - {Environment.get_host_name()}"
mail.body = "Dies ist ein Test :D"
self._mailer.send_mail(mail)
@staticmethod
def _wait(time_ms: int):
time.sleep(time_ms)
def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")
Console.write_line(List(int, range(0, 10)).select(lambda x: f"x={x}").to_list())
Console.spinner("Test", self._wait, 2, spinner_foreground_color="red")
test: TestService = self._services.get_service(TestService)
ip_pipe: IPAddressPipe = self._services.get_service(IPAddressPipe)
test.run()
test2: TestService = self._services.get_service(TestService)
ip_pipe2: IPAddressPipe = self._services.get_service(IPAddressPipe)
Console.write_line(f"DI working: {test == test2 and ip_pipe != ip_pipe2}")
Console.write_line(self._services.get_service(LoggerABC))
scope = self._services.create_scope()
Console.write_line("scope", scope)
with self._services.create_scope() as s:
Console.write_line("with scope", s)
test_settings = Configuration.get(TestSettings)
Console.write_line(test_settings.value)
Console.write_line("reload config")
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
test_settings1 = Configuration.get(TestSettings)
Console.write_line(test_settings1.value)
# self.test_send_mail()

View File

@@ -1,8 +0,0 @@
{
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "TRACE",
"Level": "TRACE"
}
}

View File

@@ -1,20 +0,0 @@
{
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"EMailClient": {
"Host": "mail.sh-edraft.de",
"Port": "587",
"UserName": "dev-srv@sh-edraft.de",
"Credentials": "RmBOQX1eNFYiYjgsSid3fV1nelc2WA=="
}
}

View File

@@ -1,23 +0,0 @@
from cpl import mail
from cpl.application.abc import StartupABC
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.core.pipes import IPAddressPipe
from cpl.dependency import ServiceCollection
from test_service import TestService
class Startup(StartupABC):
@staticmethod
def configure_configuration():
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
@staticmethod
def configure_services(services: ServiceCollection):
services.add_logging()
services.add_module(mail)
services.add_transient(IPAddressPipe)
services.add_singleton(TestService)

View File

@@ -1,10 +0,0 @@
from cpl.application.abc import ApplicationExtensionABC
from cpl.core.console import Console
from cpl.dependency import ServiceProviderABC
class TestExtension(ApplicationExtensionABC):
@staticmethod
def run(services: ServiceProviderABC):
Console.write_line("Hello World from App Extension")

View File

@@ -1,13 +0,0 @@
from cpl.core.console.console import Console
from cpl.dependency import ServiceProviderABC
from cpl.core.pipes.ip_address_pipe import IPAddressPipe
class TestService:
def __init__(self, provider: ServiceProviderABC):
self._provider = provider
def run(self):
Console.write_line("Hello World!", self._provider)
ip = [192, 168, 178, 30]
Console.write_line(ip, IPAddressPipe.to_str(ip))

View File

@@ -1,14 +0,0 @@
from cpl.application.abc import StartupExtensionABC
from cpl.core.console import Console
from cpl.dependency import ServiceCollection
class TestStartupExtension(StartupExtensionABC):
@staticmethod
def configure_configuration():
Console.write_line("config")
@staticmethod
def configure_services(services: ServiceCollection):
Console.write_line("services")

View File

@@ -1,60 +0,0 @@
from cpl.core.console import Console
from cpl.core.utils.benchmark import Benchmark
from cpl.query.enumerable import Enumerable
from cpl.query.immutable_list import ImmutableList
from cpl.query.list import List
from cpl.query.set import Set
def _default():
Console.write_line(Enumerable.empty().to_list())
Console.write_line(Enumerable.range(0, 100).length)
Console.write_line(Enumerable.range(0, 100).to_list())
Console.write_line(Enumerable.range(0, 100).where(lambda x: x % 2 == 0).length)
Console.write_line(
Enumerable.range(0, 100).where(lambda x: x % 2 == 0).to_list().select(lambda x: str(x)).to_list()
)
Console.write_line(List)
s =Enumerable.range(0, 10).to_set()
Console.write_line(s)
s.add(1)
Console.write_line(s)
data = Enumerable(
[
{"name": "Alice", "age": 30},
{"name": "Dave", "age": 35},
{"name": "Charlie", "age": 25},
{"name": "Bob", "age": 25},
]
)
Console.write_line(data.order_by(lambda x: x["age"]).to_list())
Console.write_line(data.order_by(lambda x: x["age"]).then_by(lambda x: x["name"]).to_list())
Console.write_line(data.order_by(lambda x: x["name"]).then_by(lambda x: x["age"]).to_list())
def t_benchmark(data: list):
Benchmark.all("Enumerable", lambda: Enumerable(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Set", lambda: Set(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("List", lambda: List(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all(
"ImmutableList", lambda: ImmutableList(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()
)
Benchmark.all("List comprehension", lambda: [x * 2 for x in data if x % 2 == 0])
def main():
N = 10_000_000
data = list(range(N))
#t_benchmark(data)
Console.write_line()
_default()
if __name__ == "__main__":
main()

View File

@@ -1,17 +0,0 @@
from cpl.application import StartupABC
from cpl.core.configuration import ConfigurationABC
from cpl.dependency import ServiceProviderABC, ServiceCollection
from cpl.core.environment import Environment
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
def configure_configuration(self, configuration: ConfigurationABC, environment: Environment) -> ConfigurationABC:
configuration.add_json_file("appsettings.json")
return configuration
def configure_services(self, services: ServiceCollection, environment: Environment) -> ServiceProviderABC:
services.add_translation()
return services.build()

View File

@@ -1,61 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# Find and combine requirements from src/cpl-*/requirements.txt,
# filtering out lines whose *package name* starts with "cpl-".
# Works with pinned versions, extras, markers, editable installs, and VCS refs.
shopt -s nullglob
req_files=(src/cpl-*/requirements.txt)
if ((${#req_files[@]} == 0)); then
echo "No requirements files found at src/cpl-*/requirements.txt" >&2
exit 1
fi
tmp_combined="$(mktemp)"
trap 'rm -f "$tmp_combined"' EXIT
# Concatenate, trim comments/whitespace, filter out cpl-* packages, dedupe.
# We keep non-package options/flags/constraints as-is.
awk '
function trim(s){ sub(/^[[:space:]]+/,"",s); sub(/[[:space:]]+$/,"",s); return s }
{
line=$0
# drop full-line comments and strip inline comments
if (line ~ /^[[:space:]]*#/) next
sub(/#[^!].*$/,"",line) # strip trailing comment (simple heuristic)
line=trim(line)
if (line == "") next
# Determine the package *name* even for "-e", extras, pins, markers, or VCS "@"
e = line
sub(/^-e[[:space:]]+/,"",e) # remove editable prefix
# Tokenize up to the first of these separators: space, [ < > = ! ~ ; @
token = e
sub(/\[.*/,"",token) # remove extras quickly
n = split(token, a, /[<>=!~;@[:space:]]/)
name = tolower(a[1])
# If the first token (name) starts with "cpl-", skip this requirement
if (name ~ /^cpl-/) next
print line
}
' "${req_files[@]}" | sort -u > "$tmp_combined"
if ! [ -s "$tmp_combined" ]; then
echo "Nothing to install after filtering out cpl-* packages." >&2
exit 0
fi
echo "Installing dependencies (excluding cpl-*) from:"
printf ' - %s\n' "${req_files[@]}"
echo
echo "Final set to install:"
cat "$tmp_combined"
echo
# Use python -m pip for reliability; change to python3 if needed.
python -m pip install -r "$tmp_combined"

View File

@@ -1,36 +0,0 @@
from cpl.dependency.service_collection import ServiceCollection as _ServiceCollection
from .error import APIError, AlreadyExists, EndpointNotImplemented, Forbidden, NotFound, Unauthorized
from .logger import APILogger
from .settings import ApiSettings
def add_api(collection: _ServiceCollection):
try:
from cpl.database import mysql
collection.add_module(mysql)
except ImportError as e:
from cpl.core.errors import dependency_error
dependency_error("cpl-database", e)
try:
from cpl import auth
from cpl.auth import permission
collection.add_module(auth)
collection.add_module(permission)
except ImportError as e:
from cpl.core.errors import dependency_error
dependency_error("cpl-auth", e)
from cpl.api.registry.policy import PolicyRegistry
from cpl.api.registry.route import RouteRegistry
collection.add_singleton(PolicyRegistry)
collection.add_singleton(RouteRegistry)
_ServiceCollection.with_module(add_api, __name__)

View File

@@ -1 +0,0 @@
from .asgi_middleware_abc import ASGIMiddleware

View File

@@ -1,15 +0,0 @@
from abc import ABC, abstractmethod
from starlette.types import Scope, Receive, Send
class ASGIMiddleware(ABC):
@abstractmethod
def __init__(self, app):
self._app = app
def _call_next(self, scope: Scope, receive: Receive, send: Send):
return self._app(scope, receive, send)
@abstractmethod
async def __call__(self, scope: Scope, receive: Receive, send: Send): ...

View File

@@ -1 +0,0 @@
from .web_app import WebApp

View File

@@ -1,249 +0,0 @@
import os
from enum import Enum
from typing import Mapping, Any, Callable, Self, Union
import uvicorn
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.types import ExceptionHandler
from cpl import api, auth
from cpl.api.error import APIError
from cpl.api.logger import APILogger
from cpl.api.middleware.authentication import AuthenticationMiddleware
from cpl.api.middleware.authorization import AuthorizationMiddleware
from cpl.api.middleware.logging import LoggingMiddleware
from cpl.api.middleware.request import RequestMiddleware
from cpl.api.model.api_route import ApiRoute
from cpl.api.model.policy import Policy
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.registry.policy import PolicyRegistry
from cpl.api.registry.route import RouteRegistry
from cpl.api.router import Router
from cpl.api.settings import ApiSettings
from cpl.api.typing import HTTPMethods, PartialMiddleware, PolicyResolver
from cpl.application.abc.application_abc import ApplicationABC
from cpl.core.configuration import Configuration
from cpl.dependency.service_provider_abc import ServiceProviderABC
PolicyInput = Union[dict[str, PolicyResolver], Policy]
class WebApp(ApplicationABC):
def __init__(self, services: ServiceProviderABC):
super().__init__(services, [auth, api])
self._app: Starlette | None = None
self._logger = services.get_service(APILogger)
self._api_settings = Configuration.get(ApiSettings)
self._policies = services.get_service(PolicyRegistry)
self._routes = services.get_service(RouteRegistry)
self._middleware: list[Middleware] = [
Middleware(RequestMiddleware),
Middleware(LoggingMiddleware),
]
self._exception_handlers: Mapping[Any, ExceptionHandler] = {
Exception: self._handle_exception,
APIError: self._handle_exception,
}
async def _handle_exception(self, request: Request, exc: Exception):
if isinstance(exc, APIError):
self._logger.error(exc)
return JSONResponse({"error": str(exc)}, status_code=exc.status_code)
if hasattr(request.state, "request_id"):
self._logger.error(f"Request {request.state.request_id}", exc)
else:
self._logger.error("Request unknown", exc)
return JSONResponse({"error": str(exc)}, status_code=500)
def _get_allowed_origins(self):
origins = self._api_settings.allowed_origins
if origins is None or origins == "":
self._logger.warning("No allowed origins specified, allowing all origins")
return ["*"]
self._logger.debug(f"Allowed origins: {origins}")
return origins.split(",")
def with_database(self) -> Self:
self.with_migrations()
self.with_seeders()
return self
def with_app(self, app: Starlette) -> Self:
assert app is not None, "app must not be None"
assert isinstance(app, Starlette), "app must be an instance of Starlette"
self._app = app
return self
def _check_for_app(self):
if self._app is not None:
raise ValueError("App is already set, cannot add routes or middleware")
def with_routes_directory(self, directory: str) -> Self:
self._check_for_app()
assert directory is not None, "directory must not be None"
base = directory.replace("/", ".").replace("\\", ".")
for filename in os.listdir(directory):
if not filename.endswith(".py") or filename == "__init__.py":
continue
__import__(f"{base}.{filename[:-3]}")
return self
def with_routes(
self,
routes: list[ApiRoute],
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self:
self._check_for_app()
assert self._routes is not None, "routes must not be None"
assert all(isinstance(route, ApiRoute) for route in routes), "all routes must be of type ApiRoute"
for route in routes:
self.with_route(
route.path,
route.fn,
method,
authentication,
roles,
permissions,
policies,
match,
)
return self
def with_route(
self,
path: str,
fn: Callable[[Request], Any],
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self:
self._check_for_app()
assert path is not None, "path must not be None"
assert fn is not None, "fn must not be None"
assert method in [
"GET",
"HEAD",
"POST",
"PUT",
"PATCH",
"DELETE",
"OPTIONS",
], "method must be a valid HTTP method"
Router.route(path, method, registry=self._routes)(fn)
if authentication:
Router.authenticate()(fn)
if roles or permissions or policies:
Router.authorize(roles, permissions, policies, match)(fn)
return self
def with_middleware(self, middleware: PartialMiddleware) -> Self:
self._check_for_app()
if isinstance(middleware, Middleware):
self._middleware.append(middleware)
elif callable(middleware):
self._middleware.append(Middleware(middleware))
else:
raise ValueError("middleware must be of type starlette.middleware.Middleware or a callable")
return self
def with_authentication(self) -> Self:
self.with_middleware(AuthenticationMiddleware)
return self
def with_authorization(self, *policies: list[PolicyInput] | PolicyInput) -> Self:
if policies:
_policies = []
if not isinstance(policies, list):
policies = list(policies)
for i, policy in enumerate(policies):
if isinstance(policy, dict):
for name, resolver in policy.items():
if not isinstance(name, str):
self._logger.warning(f"Skipping policy at index {i}, name must be a string")
continue
if not callable(resolver):
self._logger.warning(f"Skipping policy {name}, resolver must be callable")
continue
_policies.append(Policy(name, resolver))
continue
_policies.append(policy)
self._policies.extend(_policies)
self.with_middleware(AuthorizationMiddleware)
return self
def _validate_policies(self):
for rule in Router.get_authorization_rules():
for policy_name in rule["policies"]:
policy = self._policies.get(policy_name)
if not policy:
self._logger.fatal(f"Authorization policy '{policy_name}' not found")
async def main(self):
self._logger.debug(f"Preparing API")
self._validate_policies()
if self._app is None:
routes = [route.to_starlette(self._services.inject) for route in self._routes.all()]
app = Starlette(
routes=routes,
middleware=[
*self._middleware,
Middleware(
CORSMiddleware,
allow_origins=self._get_allowed_origins(),
allow_methods=["*"],
allow_headers=["*"],
),
],
exception_handlers=self._exception_handlers,
)
else:
app = self._app
self._logger.info(f"Start API on {self._api_settings.host}:{self._api_settings.port}")
config = uvicorn.Config(
app, host=self._api_settings.host, port=self._api_settings.port, log_config=None, loop="asyncio"
)
server = uvicorn.Server(config)
await server.serve()
self._logger.info("Shutdown API")

View File

@@ -1,46 +0,0 @@
from http.client import HTTPException
from starlette.responses import JSONResponse
from starlette.types import Scope, Receive, Send
class APIError(HTTPException):
status_code = 500
def __init__(self, message: str = ""):
super().__init__(self.status_code, message)
self._message = message
@property
def error_message(self) -> str:
if self._message:
return f"{type(self).__name__}: {self._message}"
return f"{type(self).__name__}"
async def asgi_response(self, scope: Scope, receive: Receive, send: Send):
r = JSONResponse({"error": self.error_message}, status_code=self.status_code)
return await r(scope, receive, send)
def response(self):
return JSONResponse({"error": self.error_message}, status_code=self.status_code)
class Unauthorized(APIError):
status_code = 401
class Forbidden(APIError):
status_code = 403
class NotFound(APIError):
status_code = 404
class AlreadyExists(APIError):
status_code = 409
class EndpointNotImplemented(APIError):
status_code = 501

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class APILogger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "api")

View File

@@ -1,4 +0,0 @@
from .authentication import AuthenticationMiddleware
from .authorization import AuthorizationMiddleware
from .logging import LoggingMiddleware
from .request import RequestMiddleware

View File

@@ -1,80 +0,0 @@
from keycloak import KeycloakAuthenticationError
from starlette.types import Scope, Receive, Send
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.error import Unauthorized
from cpl.api.logger import APILogger
from cpl.api.middleware.request import get_request
from cpl.api.router import Router
from cpl.auth.keycloak import KeycloakClient
from cpl.auth.schema import AuthUserDao, AuthUser
from cpl.core.ctx import set_user
from cpl.dependency import ServiceProviderABC
class AuthenticationMiddleware(ASGIMiddleware):
@ServiceProviderABC.inject
def __init__(self, app, logger: APILogger, keycloak: KeycloakClient, user_dao: AuthUserDao):
ASGIMiddleware.__init__(self, app)
self._logger = logger
self._keycloak = keycloak
self._user_dao = user_dao
async def __call__(self, scope: Scope, receive: Receive, send: Send):
request = get_request()
url = request.url.path
if url not in Router.get_auth_required_routes():
self._logger.trace(f"No authentication required for {url}")
return await self._app(scope, receive, send)
if not request.headers.get("Authorization"):
self._logger.debug(f"Unauthorized access to {url}, missing Authorization header")
return await Unauthorized(f"Missing header Authorization").asgi_response(scope, receive, send)
auth_header = request.headers.get("Authorization", None)
if not auth_header or not auth_header.startswith("Bearer "):
return await Unauthorized("Invalid Authorization header").asgi_response(scope, receive, send)
token = auth_header.split("Bearer ")[1]
if not await self._verify_login(token):
self._logger.debug(f"Unauthorized access to {url}, invalid token")
return await Unauthorized("Invalid token").asgi_response(scope, receive, send)
# check user exists in db, if not create
keycloak_id = self._keycloak.get_user_id(token)
if keycloak_id is None:
return await Unauthorized("Failed to get user id from token").asgi_response(scope, receive, send)
user = await self._get_or_crate_user(keycloak_id)
if user.deleted:
self._logger.debug(f"Unauthorized access to {url}, user is deleted")
return await Unauthorized("User is deleted").asgi_response(scope, receive, send)
request.state.user = user
set_user(user)
return await self._call_next(scope, receive, send)
async def _get_or_crate_user(self, keycloak_id: str) -> AuthUser:
existing = await self._user_dao.find_by_keycloak_id(keycloak_id)
if existing is not None:
return existing
user = AuthUser(0, keycloak_id)
uid = await self._user_dao.create(user)
return await self._user_dao.get_by_id(uid)
async def _verify_login(self, token: str) -> bool:
try:
token_info = self._keycloak.introspect(token)
return token_info.get("active", False)
except KeycloakAuthenticationError as e:
self._logger.debug(f"Keycloak authentication error: {e}")
return False
except Exception as e:
self._logger.error(f"Unexpected error during token verification: {e}")
return False

View File

@@ -1,73 +0,0 @@
from starlette.types import Scope, Receive, Send
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.error import Unauthorized, Forbidden
from cpl.api.logger import APILogger
from cpl.api.middleware.request import get_request
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.registry.policy import PolicyRegistry
from cpl.api.router import Router
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
from cpl.core.ctx.user_context import get_user
from cpl.dependency.service_provider_abc import ServiceProviderABC
class AuthorizationMiddleware(ASGIMiddleware):
@ServiceProviderABC.inject
def __init__(self, app, logger: APILogger, policies: PolicyRegistry, user_dao: AuthUserDao):
ASGIMiddleware.__init__(self, app)
self._logger = logger
self._policies = policies
self._user_dao = user_dao
async def __call__(self, scope: Scope, receive: Receive, send: Send):
request = get_request()
url = request.url.path
if url not in Router.get_authorization_rules_paths():
self._logger.trace(f"No authorization required for {url}")
return await self._app(scope, receive, send)
user = get_user()
if not user:
return await Unauthorized(f"Unknown user").asgi_response(scope, receive, send)
roles = await user.roles
request.state.roles = roles
role_names = [r.name for r in roles]
perms = await user.permissions
request.state.permissions = perms
perm_names = [p.name for p in perms]
for rule in Router.get_authorization_rules():
match = rule["match"]
if rule["roles"]:
if match == ValidationMatch.all and not all(r in role_names for r in rule["roles"]):
return await Forbidden(f"missing roles: {rule["roles"]}").asgi_response(scope, receive, send)
if match == ValidationMatch.any and not any(r in role_names for r in rule["roles"]):
return await Forbidden(f"missing roles: {rule["roles"]}").asgi_response(scope, receive, send)
if rule["permissions"]:
if match == ValidationMatch.all and not all(p in perm_names for p in rule["permissions"]):
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(
scope, receive, send
)
if match == ValidationMatch.any and not any(p in perm_names for p in rule["permissions"]):
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(
scope, receive, send
)
for policy_name in rule["policies"]:
policy = self._policies.get(policy_name)
if not policy:
self._logger.warning(f"Authorization policy '{policy_name}' not found")
continue
if not await policy.resolve(user):
return await Forbidden(f"policy {policy.name} failed").asgi_response(scope, receive, send)
return await self._call_next(scope, receive, send)

View File

@@ -1,87 +0,0 @@
import time
from starlette.requests import Request
from starlette.types import Receive, Scope, Send
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.logger import APILogger
from cpl.api.middleware.request import get_request
from cpl.dependency import ServiceProviderABC
class LoggingMiddleware(ASGIMiddleware):
@ServiceProviderABC.inject
def __init__(self, app, logger: APILogger):
ASGIMiddleware.__init__(self, app)
self._logger = logger
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] != "http":
await self._call_next(scope, receive, send)
return
request = get_request()
await self._log_request(request)
start_time = time.time()
response_body = b""
status_code = 500
async def send_wrapper(message):
nonlocal response_body, status_code
if message["type"] == "http.response.start":
status_code = message["status"]
if message["type"] == "http.response.body":
response_body += message.get("body", b"")
await send(message)
await self._call_next(scope, receive, send_wrapper)
duration = (time.time() - start_time) * 1000
await self._log_after_request(request, status_code, duration)
@staticmethod
def _filter_relevant_headers(headers: dict) -> dict:
relevant_keys = {
"content-type",
"host",
"connection",
"user-agent",
"origin",
"referer",
"accept",
}
return {key: value for key, value in headers.items() if key in relevant_keys}
async def _log_request(self, request: Request):
self._logger.debug(
f"Request {getattr(request.state, 'request_id', '-')}: {request.method}@{request.url.path} from {request.client.host}"
)
from cpl.core.ctx.user_context import get_user
user = get_user()
request_info = {
"headers": self._filter_relevant_headers(dict(request.headers)),
"args": dict(request.query_params),
"form-data": (
await request.form()
if request.headers.get("content-type") == "application/x-www-form-urlencoded"
else None
),
"payload": (await request.json() if request.headers.get("content-length") == "0" else None),
"user": f"{user.id}-{user.keycloak_id}" if user else None,
"files": (
{key: file.filename for key, file in (await request.form()).items()} if await request.form() else None
),
}
self._logger.trace(f"Request {getattr(request.state, 'request_id', '-')}: {request_info}")
async def _log_after_request(self, request: Request, status_code: int, duration: float):
self._logger.info(
f"Request finished {getattr(request.state, 'request_id', '-')}: {status_code}-{request.method}@{request.url.path} from {request.client.host} in {duration:.2f}ms"
)

View File

@@ -1,56 +0,0 @@
import time
from contextvars import ContextVar
from typing import Optional, Union
from uuid import uuid4
from starlette.requests import Request
from starlette.types import Scope, Receive, Send
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.logger import APILogger
from cpl.api.typing import TRequest
from cpl.dependency import ServiceProviderABC
_request_context: ContextVar[Union[TRequest, None]] = ContextVar("request", default=None)
class RequestMiddleware(ASGIMiddleware):
@ServiceProviderABC.inject
def __init__(self, app, logger: APILogger):
ASGIMiddleware.__init__(self, app)
self._logger = logger
self._ctx_token = None
async def __call__(self, scope: Scope, receive: Receive, send: Send):
request = Request(scope, receive, send)
await self.set_request_data(request)
try:
await self._app(scope, receive, send)
finally:
await self.clean_request_data()
async def set_request_data(self, request: TRequest):
request.state.request_id = uuid4()
request.state.start_time = time.time()
self._logger.trace(f"Set new current request: {request.state.request_id}")
self._ctx_token = _request_context.set(request)
async def clean_request_data(self):
request = get_request()
if request is None:
return
if self._ctx_token is None:
return
self._logger.trace(f"Clearing current request: {request.state.request_id}")
_request_context.reset(self._ctx_token)
def get_request() -> Optional[TRequest]:
return _request_context.get()

View File

@@ -1,3 +0,0 @@
from .api_route import ApiRoute
from .policy import Policy
from .validation_match import ValidationMatch

View File

@@ -1,43 +0,0 @@
from typing import Callable
from starlette.routing import Route
from cpl.api.typing import HTTPMethods
class ApiRoute:
def __init__(self, path: str, fn: Callable, method: HTTPMethods, **kwargs):
self._path = path
self._fn = fn
self._method = method
self._kwargs = kwargs
@property
def name(self) -> str:
return self._fn.__name__
@property
def fn(self) -> Callable:
return self._fn
@property
def path(self) -> str:
return self._path
@property
def method(self) -> HTTPMethods:
return self._method
@property
def kwargs(self) -> dict:
return self._kwargs
def to_starlette(self, wrap_endpoint: Callable = None) -> Route:
return Route(
self._path,
self._fn if not wrap_endpoint else wrap_endpoint(self._fn),
methods=[self._method],
**self._kwargs,
)

View File

@@ -1,34 +0,0 @@
from asyncio import iscoroutinefunction
from typing import Optional
from cpl.api.typing import PolicyResolver
from cpl.core.ctx import get_user
class Policy:
def __init__(
self,
name: str,
resolver: PolicyResolver = None,
):
self._name = name
self._resolver: Optional[PolicyResolver] = resolver
@property
def name(self) -> str:
return self._name
@property
def resolvers(self) -> PolicyResolver:
return self._resolver
async def resolve(self, *args, **kwargs) -> bool:
if not self._resolver:
return True
if callable(self._resolver):
if iscoroutinefunction(self._resolver):
return await self._resolver(get_user())
return self._resolver(get_user())
return False

View File

@@ -1,6 +0,0 @@
from enum import Enum
class ValidationMatch(Enum):
any = "any"
all = "all"

View File

@@ -1,2 +0,0 @@
from .policy import PolicyRegistry
from .route import RouteRegistry

View File

@@ -1,28 +0,0 @@
from typing import Optional
from cpl.api.model.policy import Policy
from cpl.core.abc.registry_abc import RegistryABC
class PolicyRegistry(RegistryABC):
def __init__(self):
RegistryABC.__init__(self)
def extend(self, items: list[Policy]):
for policy in items:
self.add(policy)
def add(self, item: Policy):
assert isinstance(item, Policy), "policy must be an instance of Policy"
if item.name in self._items:
raise ValueError(f"Policy {item.name} is already registered")
self._items[item.name] = item
def get(self, key: str) -> Optional[Policy]:
return self._items.get(key)
def all(self) -> list[Policy]:
return list(self._items.values())

View File

@@ -1,32 +0,0 @@
from typing import Optional
from cpl.api.model.api_route import ApiRoute
from cpl.core.abc.registry_abc import RegistryABC
class RouteRegistry(RegistryABC):
def __init__(self):
RegistryABC.__init__(self)
def extend(self, items: list[ApiRoute]):
for policy in items:
self.add(policy)
def add(self, item: ApiRoute):
assert isinstance(item, ApiRoute), "route must be an instance of ApiRoute"
if item.path in self._items:
raise ValueError(f"ApiRoute {item.path} is already registered")
self._items[item.path] = item
def set(self, item: ApiRoute):
assert isinstance(item, ApiRoute), "route must be an instance of ApiRoute"
self._items[item.path] = item
def get(self, key: str) -> Optional[ApiRoute]:
return self._items.get(key)
def all(self) -> list[ApiRoute]:
return list(self._items.values())

View File

@@ -1,164 +0,0 @@
from enum import Enum
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.registry.route import RouteRegistry
from cpl.api.typing import HTTPMethods
class Router:
_auth_required: list[str] = []
_authorization_rules: dict[str, dict] = {}
@classmethod
def get_auth_required_routes(cls) -> list[str]:
return cls._auth_required
@classmethod
def get_authorization_rules_paths(cls) -> list[str]:
return list(cls._authorization_rules.keys())
@classmethod
def get_authorization_rules(cls) -> list[dict]:
return list(cls._authorization_rules.values())
@classmethod
def authenticate(cls):
"""
Decorator to mark a route as requiring authentication.
Usage:
@Route.authenticate()
@Route.get("/example")
async def example_endpoint(request: TRequest):
...
"""
def inner(fn):
route_path = getattr(fn, "_route_path", None)
if route_path and route_path not in cls._auth_required:
cls._auth_required.append(route_path)
return fn
return inner
@classmethod
def authorize(
cls,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
):
"""
Decorator to mark a route as requiring authorization.
Usage:
@Route.authorize()
@Route.get("/example")
async def example_endpoint(request: TRequest):
...
"""
assert roles is None or isinstance(roles, list), "roles must be a list of strings"
assert permissions is None or isinstance(permissions, list), "permissions must be a list of strings"
assert policies is None or isinstance(policies, list), "policies must be a list of strings"
assert match is None or isinstance(match, ValidationMatch), "match must be an instance of ValidationMatch"
if roles is not None:
for role in roles:
if isinstance(role, Enum):
roles[roles.index(role)] = role.value
if permissions is not None:
for perm in permissions:
if isinstance(perm, Enum):
permissions[permissions.index(perm)] = perm.value
def inner(fn):
path = getattr(fn, "_route_path", None)
if not path:
return fn
if path in cls._authorization_rules:
raise ValueError(f"Route {path} is already registered for authorization")
cls._authorization_rules[path] = {
"roles": roles or [],
"permissions": permissions or [],
"policies": policies or [],
"match": match or ValidationMatch.all,
}
return fn
return inner
@classmethod
def route(cls, path: str, method: HTTPMethods, registry: RouteRegistry = None, **kwargs):
from cpl.api.model.api_route import ApiRoute
if not registry:
from cpl.dependency.service_provider_abc import ServiceProviderABC
routes = ServiceProviderABC.get_global_service(RouteRegistry)
else:
routes = registry
def inner(fn):
routes.add(ApiRoute(path, fn, method, **kwargs))
setattr(fn, "_route_path", path)
return fn
return inner
@classmethod
def get(cls, path: str, **kwargs):
return cls.route(path, "GET", **kwargs)
@classmethod
def head(cls, path: str, **kwargs):
return cls.route(path, "HEAD", **kwargs)
@classmethod
def post(cls, path: str, **kwargs):
return cls.route(path, "POST", **kwargs)
@classmethod
def put(cls, path: str, **kwargs):
return cls.route(path, "PUT", **kwargs)
@classmethod
def patch(cls, path: str, **kwargs):
return cls.route(path, "PATCH", **kwargs)
@classmethod
def delete(cls, path: str, **kwargs):
return cls.route(path, "DELETE", **kwargs)
@classmethod
def override(cls):
"""
Decorator to override an existing route with the same path.
Usage:
@Route.override()
@Route.get("/example")
async def example_endpoint(request: TRequest):
...
"""
from cpl.api.model.api_route import ApiRoute
from cpl.dependency.service_provider_abc import ServiceProviderABC
routes = ServiceProviderABC.get_global_service(RouteRegistry)
def inner(fn):
path = getattr(fn, "_route_path", None)
if path is None:
raise ValueError("Cannot override a route that has not been registered yet")
route = routes.get(path)
if route is None:
raise ValueError(f"Cannot override a route that does not exist: {path}")
routes.add(ApiRoute(path, fn, route.method, **route.kwargs))
setattr(fn, "_route_path", path)
return fn
return inner

View File

@@ -1,13 +0,0 @@
from typing import Optional
from cpl.core.configuration import ConfigurationModelABC
class ApiSettings(ConfigurationModelABC):
def __init__(self, src: Optional[dict] = None):
super().__init__(src)
self.option("host", str, "0.0.0.0")
self.option("port", int, 5000)
self.option("allowed_origins", list[str])

View File

@@ -1,19 +0,0 @@
from typing import Union, Literal, Callable, Type, Awaitable
from urllib.request import Request
from starlette.middleware import Middleware
from starlette.types import ASGIApp
from starlette.websockets import WebSocket
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.auth.schema import AuthUser
TRequest = Union[Request, WebSocket]
HTTPMethods = Literal["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"]
PartialMiddleware = Union[
ASGIMiddleware,
Type[ASGIMiddleware],
Middleware,
Callable[[ASGIApp], ASGIApp],
]
PolicyResolver = Callable[[AuthUser], bool | Awaitable[bool]]

View File

@@ -1,30 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-api"
version = "2024.7.0"
description = "CPL api"
readme ="CPL api package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "api", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1,7 +0,0 @@
cpl-auth
cpl-application
cpl-core
cpl-dependency
starlette==0.48.0
python-multipart==0.0.20
uvicorn==0.35.0

View File

@@ -1 +0,0 @@
from .application_builder import ApplicationBuilder

View File

@@ -1,4 +0,0 @@
from .application_abc import ApplicationABC
from .application_extension_abc import ApplicationExtensionABC
from .startup_abc import StartupABC
from .startup_extension_abc import StartupExtensionABC

View File

@@ -1,92 +0,0 @@
from abc import ABC, abstractmethod
from typing import Callable, Self
from cpl.application.host import Host
from cpl.core.log.log_level import LogLevel
from cpl.core.log.log_settings import LogSettings
from cpl.core.log.logger_abc import LoggerABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
def __not_implemented__(package: str, func: Callable):
raise NotImplementedError(f"Package {package} is required to use {func.__name__} method")
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
services: :class:`cpl.dependency.service_provider_abc.ServiceProviderABC`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(self, services: ServiceProviderABC, required_modules: list[str | object] = None):
self._services = services
self._required_modules = (
[x.__name__ if not isinstance(x, str) else x for x in required_modules] if required_modules else []
)
@property
def required_modules(self) -> list[str]:
return self._required_modules
@classmethod
def extend(cls, name: str | Callable, func: Callable[[Self], Self]):
r"""Extend the Application with a custom method
Parameters:
name: :class:`str`
Name of the method
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
if callable(name):
name = name.__name__
setattr(cls, name, func)
return cls
def with_logging(self, level: LogLevel = None):
if level is None:
from cpl.core.configuration.configuration import Configuration
settings = Configuration.get(LogSettings)
level = settings.level if settings else LogLevel.info
logger = self._services.get_service(LoggerABC)
logger.set_level(level)
def with_permissions(self, *args, **kwargs):
__not_implemented__("cpl-auth", self.with_permissions)
def with_migrations(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_migrations)
def with_seeders(self, *args, **kwargs):
__not_implemented__("cpl-database", self.with_seeders)
def with_extension(self, func: Callable[[Self, ...], None], *args, **kwargs):
r"""Extend the Application with a custom method
Parameters:
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
assert func is not None, "func must not be None"
assert callable(func), "func must be callable"
func(self, *args, **kwargs)
def run(self):
r"""Entry point
Called by custom Application.main
"""
try:
Host.run(self.main)
except KeyboardInterrupt:
pass
@abstractmethod
def main(self): ...

View File

@@ -1,10 +0,0 @@
from abc import ABC, abstractmethod
from cpl.dependency.service_provider_abc import ServiceProviderABC
class ApplicationExtensionABC(ABC):
@staticmethod
@abstractmethod
def run(services: ServiceProviderABC): ...

View File

@@ -1,21 +0,0 @@
from abc import ABC, abstractmethod
from cpl.dependency.service_collection import ServiceCollection
class StartupABC(ABC):
r"""ABC for the startup class"""
@staticmethod
@abstractmethod
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
def configure_services(service: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -1,20 +0,0 @@
from abc import ABC, abstractmethod
from cpl.dependency import ServiceCollection
class StartupExtensionABC(ABC):
r"""ABC for startup extension classes"""
@staticmethod
@abstractmethod
def configure_configuration():
r"""Creates configuration of application"""
@staticmethod
@abstractmethod
def configure_services(services: ServiceCollection):
r"""Creates service provider
Parameter:
services: :class:`cpl.dependency.service_collection`
"""

View File

@@ -1,80 +0,0 @@
import asyncio
from typing import Type, Optional, TypeVar, Generic
from cpl.application.abc.application_abc import ApplicationABC
from cpl.application.abc.application_extension_abc import ApplicationExtensionABC
from cpl.application.abc.startup_abc import StartupABC
from cpl.application.abc.startup_extension_abc import StartupExtensionABC
from cpl.application.host import Host
from cpl.core.errors import dependency_error
from cpl.dependency.service_collection import ServiceCollection
TApp = TypeVar("TApp", bound=ApplicationABC)
class ApplicationBuilder(Generic[TApp]):
def __init__(self, app: Type[ApplicationABC]):
assert app is not None, "app must not be None"
assert issubclass(app, ApplicationABC), "app must be an subclass of ApplicationABC or its subclass"
self._app = app if app is not None else ApplicationABC
self._services = ServiceCollection()
self._startup: Optional[StartupABC] = None
self._app_extensions: list[Type[ApplicationExtensionABC]] = []
self._startup_extensions: list[Type[StartupExtensionABC]] = []
self._async_loop = asyncio.get_event_loop()
@property
def services(self) -> ServiceCollection:
return self._services
@property
def service_provider(self):
return self._services.build()
def validate_app_required_modules(self, app: ApplicationABC):
for module in app.required_modules:
if module in self._services.loaded_modules:
continue
dependency_error(
module,
ImportError(
f"Required module '{module}' for application '{app.__class__.__name__}' is not loaded. Load using 'add_module({module})' method."
),
)
def with_startup(self, startup: Type[StartupABC]) -> "ApplicationBuilder":
self._startup = startup
return self
def with_extension(
self,
extension: Type[ApplicationExtensionABC | StartupExtensionABC],
) -> "ApplicationBuilder":
if (issubclass(extension, ApplicationExtensionABC)) and extension not in self._app_extensions:
self._app_extensions.append(extension)
elif (issubclass(extension, StartupExtensionABC)) and extension not in self._startup_extensions:
self._startup_extensions.append(extension)
return self
def build(self) -> TApp:
for extension in self._startup_extensions:
Host.run(extension.configure_configuration)
Host.run(extension.configure_services, self._services)
if self._startup is not None:
Host.run(self._startup.configure_configuration)
Host.run(self._startup.configure_services, self._services)
for extension in self._app_extensions:
Host.run(extension.run, self.service_provider)
app = self._app(self.service_provider)
self.validate_app_required_modules(app)
return app

View File

@@ -1,17 +0,0 @@
import asyncio
from typing import Callable
class Host:
_loop = asyncio.get_event_loop()
@classmethod
def get_loop(cls):
return cls._loop
@classmethod
def run(cls, func: Callable, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return cls._loop.run_until_complete(func(*args, **kwargs))
return func(*args, **kwargs)

View File

@@ -1,30 +0,0 @@
[build-system]
requires = ["setuptools>=70.1.0", "wheel>=0.43.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cpl-application"
version = "2024.7.0"
description = "CPL application"
readme ="CPL application package"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
{ name = "Sven Heidemann", email = "sven.heidemann@sh-edraft.de" }
]
keywords = ["cpl", "application", "backend", "shared", "library"]
dynamic = ["dependencies", "optional-dependencies"]
[project.urls]
Homepage = "https://www.sh-edraft.de"
[tool.setuptools.packages.find]
where = ["."]
include = ["cpl*"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
optional-dependencies.dev = { file = ["requirements.dev.txt"] }

View File

@@ -1 +0,0 @@
black==25.1.0

View File

@@ -1,2 +0,0 @@
cpl-core
cpl-dependency

View File

@@ -1,84 +0,0 @@
from enum import Enum
from typing import Type
from cpl.application.abc import ApplicationABC as _ApplicationABC
from cpl.auth import permission as _permission
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin as _KeycloakAdmin
from cpl.auth.keycloak.keycloak_client import KeycloakClient as _KeycloakClient
from cpl.dependency.service_collection import ServiceCollection as _ServiceCollection
from .logger import AuthLogger
from .keycloak_settings import KeycloakSettings
from .permission_seeder import PermissionSeeder
def _with_permissions(self: _ApplicationABC, *permissions: Type[Enum]) -> _ApplicationABC:
from cpl.auth.permission.permissions_registry import PermissionsRegistry
for perm in permissions:
PermissionsRegistry.with_enum(perm)
return self
def _add_daos(collection: _ServiceCollection):
from .schema._administration.auth_user_dao import AuthUserDao
from .schema._administration.api_key_dao import ApiKeyDao
from .schema._permission.api_key_permission_dao import ApiKeyPermissionDao
from .schema._permission.permission_dao import PermissionDao
from .schema._permission.role_dao import RoleDao
from .schema._permission.role_permission_dao import RolePermissionDao
from .schema._permission.role_user_dao import RoleUserDao
collection.add_singleton(AuthUserDao)
collection.add_singleton(ApiKeyDao)
collection.add_singleton(ApiKeyPermissionDao)
collection.add_singleton(PermissionDao)
collection.add_singleton(RoleDao)
collection.add_singleton(RolePermissionDao)
collection.add_singleton(RoleUserDao)
def add_auth(collection: _ServiceCollection):
import os
try:
from cpl.database.service.migration_service import MigrationService
from cpl.database.model.server_type import ServerType, ServerTypes
collection.add_singleton(_KeycloakClient)
collection.add_singleton(_KeycloakAdmin)
_add_daos(collection)
provider = collection.build()
migration_service: MigrationService = provider.get_service(MigrationService)
if ServerType.server_type == ServerTypes.POSTGRES:
migration_service.with_directory(
os.path.join(os.path.dirname(os.path.realpath(__file__)), "scripts/postgres")
)
elif ServerType.server_type == ServerTypes.MYSQL:
migration_service.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "scripts/mysql"))
except ImportError as e:
from cpl.core.console import Console
Console.error("cpl-database is not installed", str(e))
def add_permission(collection: _ServiceCollection):
from .permission_seeder import PermissionSeeder
from .permission.permissions_registry import PermissionsRegistry
from .permission.permissions import Permissions
try:
from cpl.database.abc.data_seeder_abc import DataSeederABC
collection.add_singleton(DataSeederABC, PermissionSeeder)
PermissionsRegistry.with_enum(Permissions)
except ImportError as e:
from cpl.core.console import Console
Console.error("cpl-database is not installed", str(e))
_ServiceCollection.with_module(add_auth, __name__)
_ServiceCollection.with_module(add_permission, _permission.__name__)
_ApplicationABC.extend(_ApplicationABC.with_permissions, _with_permissions)

View File

@@ -1,3 +0,0 @@
from .keycloak_admin import KeycloakAdmin
from .keycloak_client import KeycloakClient
from .keycloak_user import KeycloakUser

View File

@@ -1,22 +0,0 @@
from keycloak import KeycloakAdmin as _KeycloakAdmin, KeycloakOpenIDConnection
from cpl.auth.keycloak_settings import KeycloakSettings
from cpl.auth.logger import AuthLogger
class KeycloakAdmin(_KeycloakAdmin):
def __init__(self, logger: AuthLogger, settings: KeycloakSettings):
# logger.info("Initializing Keycloak admin")
_connection = KeycloakOpenIDConnection(
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
_KeycloakAdmin.__init__(
self,
connection=_connection,
)
self.__connection = _connection

View File

@@ -1,23 +0,0 @@
from typing import Optional
from keycloak import KeycloakOpenID
from cpl.auth.logger import AuthLogger
from cpl.auth.keycloak_settings import KeycloakSettings
class KeycloakClient(KeycloakOpenID):
def __init__(self, logger: AuthLogger, settings: KeycloakSettings):
KeycloakOpenID.__init__(
self,
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
logger.info("Initializing Keycloak client")
def get_user_id(self, token: str) -> Optional[str]:
info = self.introspect(token)
return info.get("sub", None)

View File

@@ -1,36 +0,0 @@
from cpl.core.utils.get_value import get_value
from cpl.dependency import ServiceProviderABC
class KeycloakUser:
def __init__(self, source: dict):
self._username = get_value(source, "preferred_username", str)
self._email = get_value(source, "email", str)
self._email_verified = get_value(source, "email_verified", bool)
self._name = get_value(source, "name", str)
@property
def username(self) -> str:
return self._username
@property
def email(self) -> str:
return self._email
@property
def email_verified(self) -> bool:
return self._email_verified
@property
def name(self) -> str:
return self._name
# Attrs from keycloak
@property
def id(self) -> str:
from cpl.auth import KeycloakAdmin
keycloak_admin: KeycloakAdmin = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak_admin.get_user_id(self._username)

View File

@@ -1,17 +0,0 @@
from typing import Optional
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
class KeycloakSettings(ConfigurationModelABC):
def __init__(
self,
src: Optional[dict] = None,
):
ConfigurationModelABC.__init__(self, src, "KEYCLOAK")
self.option("url", str, required=True)
self.option("client_id", str, required=True)
self.option("realm", str, required=True)
self.option("client_secret", str, required=True)

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class AuthLogger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "auth")

View File

@@ -1,36 +0,0 @@
from enum import Enum
class Permissions(Enum):
""" """
"""
Administration
"""
# administrator
administrator = "administrator"
# api keys
api_keys = "api_keys"
api_keys_create = "api_keys.create"
api_keys_update = "api_keys.update"
api_keys_delete = "api_keys.delete"
# users
users = "users"
users_create = "users.create"
users_update = "users.update"
users_delete = "users.delete"
# settings
settings = "settings"
settings_update = "settings.update"
"""
Permissions
"""
# roles
roles = "roles"
roles_create = "roles.create"
roles_update = "roles.update"
roles_delete = "roles.delete"

View File

@@ -1,24 +0,0 @@
from enum import Enum
from typing import Type
class PermissionsRegistry:
_permissions: dict[str, str] = {}
@classmethod
def get(cls):
return cls._permissions.keys()
@classmethod
def descriptions(cls):
return {x: cls._permissions[x] for x in cls._permissions if cls._permissions[x] is not None}
@classmethod
def set(cls, permission: str, description: str = None):
cls._permissions[permission] = description
@classmethod
def with_enum(cls, e: Type[Enum]):
perms = [x.value for x in e]
for perm in perms:
cls.set(str(perm))

View File

@@ -1,120 +0,0 @@
from cpl.auth.permission.permissions import Permissions
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.auth.schema import (
Permission,
Role,
RolePermission,
ApiKey,
ApiKeyPermission,
PermissionDao,
RoleDao,
RolePermissionDao,
ApiKeyDao,
ApiKeyPermissionDao,
)
from cpl.core.utils.get_value import get_value
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.logger import DBLogger
class PermissionSeeder(DataSeederABC):
def __init__(
self,
logger: DBLogger,
permission_dao: PermissionDao,
role_dao: RoleDao,
role_permission_dao: RolePermissionDao,
api_key_dao: ApiKeyDao,
api_key_permission_dao: ApiKeyPermissionDao,
):
DataSeederABC.__init__(self)
self._logger = logger
self._permission_dao = permission_dao
self._role_dao = role_dao
self._role_permission_dao = role_permission_dao
self._api_key_dao = api_key_dao
self._api_key_permission_dao = api_key_permission_dao
async def seed(self):
permissions = await self._permission_dao.get_all()
possible_permissions = [permission for permission in PermissionsRegistry.get()]
if len(permissions) == len(possible_permissions):
self._logger.info("Permissions already existing")
await self._update_missing_descriptions()
return
to_delete = []
for permission in permissions:
if permission.name in possible_permissions:
continue
to_delete.append(permission)
await self._permission_dao.delete_many(to_delete, hard_delete=True)
self._logger.warning("Permissions incomplete")
permission_names = [permission.name for permission in permissions]
await self._permission_dao.create_many(
[
Permission(
0,
permission,
get_value(PermissionsRegistry.descriptions(), permission, str),
)
for permission in possible_permissions
if permission not in permission_names
]
)
await self._update_missing_descriptions()
await self._add_missing_to_role()
await self._add_missing_to_api_key()
async def _add_missing_to_role(self):
admin_role = await self._role_dao.find_single_by([{Role.id: 1}, {Role.name: "admin"}])
if admin_role is None:
return
admin_permissions = await self._role_permission_dao.get_by_role_id(admin_role.id, with_deleted=True)
to_assign = [
RolePermission(0, admin_role.id, permission.id)
for permission in await self._permission_dao.get_all()
if permission.id not in [x.permission_id for x in admin_permissions]
]
await self._role_permission_dao.create_many(to_assign)
async def _add_missing_to_api_key(self):
admin_api_key = await self._api_key_dao.find_single_by([{ApiKey.id: 1}, {ApiKey.identifier: "admin"}])
if admin_api_key is None:
return
admin_permissions = await self._api_key_permission_dao.find_by_api_key_id(admin_api_key.id, with_deleted=True)
to_assign = [
ApiKeyPermission(0, admin_api_key.id, permission.id)
for permission in await self._permission_dao.get_all()
if permission.id not in [x.permission_id for x in admin_permissions]
]
await self._api_key_permission_dao.create_many(to_assign)
async def _update_missing_descriptions(self):
permissions = {
permission.name: permission
for permission in await self._permission_dao.find_by([{Permission.description: None}])
}
to_update = []
if len(permissions) == 0:
return
for key in PermissionsRegistry.descriptions():
if key.value not in permissions:
continue
permissions[key.value].description = PermissionsRegistry.descriptions()[key]
to_update.append(permissions[key.value])
if len(to_update) == 0:
return
await self._permission_dao.update_many(to_update)

View File

@@ -1,15 +0,0 @@
from ._administration.api_key import ApiKey
from ._administration.api_key_dao import ApiKeyDao
from ._administration.auth_user import AuthUser
from ._administration.auth_user_dao import AuthUserDao
from ._permission.api_key_permission import ApiKeyPermission
from ._permission.api_key_permission_dao import ApiKeyPermissionDao
from ._permission.permission import Permission
from ._permission.permission_dao import PermissionDao
from ._permission.role import Role
from ._permission.role_dao import RoleDao
from ._permission.role_permission import RolePermission
from ._permission.role_permission_dao import RolePermissionDao
from ._permission.role_user import RoleUser
from ._permission.role_user_dao import RoleUserDao

View File

@@ -1,66 +0,0 @@
import secrets
from datetime import datetime
from typing import Optional, Union
from async_property import async_property
from cpl.auth.permission.permissions import Permissions
from cpl.core.environment.environment import Environment
from cpl.core.log.logger import Logger
from cpl.core.typing import Id, SerialId
from cpl.core.utils.credential_manager import CredentialManager
from cpl.database.abc.db_model_abc import DbModelABC
from cpl.dependency.service_provider_abc import ServiceProviderABC
_logger = Logger(__name__)
class ApiKey(DbModelABC):
def __init__(
self,
id: SerialId,
identifier: str,
key: Union[str, bytes],
deleted: bool = False,
editor_id: Optional[Id] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._identifier = identifier
self._key = key
@property
def identifier(self) -> str:
return self._identifier
@property
def key(self) -> str:
return self._key
@property
def plain_key(self) -> str:
return CredentialManager.decrypt(self.key)
@async_property
async def permissions(self):
from cpl.auth.schema._permission.api_key_permission_dao import ApiKeyPermissionDao
apiKeyPermissionDao = ServiceProviderABC.get_global_provider().get_service(ApiKeyPermissionDao)
return [await x.permission for x in await apiKeyPermissionDao.find_by_api_key_id(self.id)]
async def has_permission(self, permission: Permissions) -> bool:
return permission.value in [x.name for x in await self.permissions]
def set_new_api_key(self):
self._key = self.new_key()
@staticmethod
def new_key() -> str:
return CredentialManager.encrypt(f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}")
@classmethod
def new(cls, identifier: str) -> "ApiKey":
return ApiKey(0, identifier, cls.new_key())

View File

@@ -1,29 +0,0 @@
from typing import Optional
from cpl.auth.schema._administration.api_key import ApiKey
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
class ApiKeyDao(DbModelDaoABC[ApiKey]):
def __init__(self):
DbModelDaoABC.__init__(self, ApiKey, TableManager.get("api_keys"))
self.attribute(ApiKey.identifier, str)
self.attribute(ApiKey.key, str, "keystring")
async def get_by_identifier(self, ident: str) -> ApiKey:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Identifier = '{ident}'")
return self.to_object(result[0])
async def get_by_key(self, key: str) -> ApiKey:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Keystring = '{key}'")
return self.to_object(result[0])
async def find_by_key(self, key: str) -> Optional[ApiKey]:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Keystring = '{key}'")
if not result or len(result) == 0:
return None
return self.to_object(result[0])

View File

@@ -1,89 +0,0 @@
import uuid
from datetime import datetime
from typing import Optional
from async_property import async_property
from keycloak import KeycloakGetError
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.database.logger import DBLogger
from cpl.dependency import ServiceProviderABC
class AuthUser(DbModelABC):
def __init__(
self,
id: SerialId,
keycloak_id: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._keycloak_id = keycloak_id
@property
def keycloak_id(self) -> str:
return self._keycloak_id
@property
def username(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak.get_user(self._keycloak_id).get("username")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
logger = ServiceProviderABC.get_global_service(DBLogger)
logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@property
def email(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak = ServiceProviderABC.get_global_service(KeycloakAdmin)
return keycloak.get_user(self._keycloak_id).get("email")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
logger = ServiceProviderABC.get_global_service(DBLogger)
logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@async_property
async def roles(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = ServiceProviderABC.get_global_service(RoleUserDao)
return [await x.role for x in await role_user_dao.get_by_user_id(self.id)]
@async_property
async def permissions(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.get_permissions(self.id)
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.has_permission(self.id, permission)
async def anonymize(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
self._keycloak_id = str(uuid.UUID(int=0))
await auth_user_dao.update(self)

View File

@@ -1,69 +0,0 @@
from typing import Optional, Union
from cpl.auth.permission.permissions import Permissions
from cpl.auth.schema._administration.auth_user import AuthUser
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
from cpl.database.external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.dependency import ServiceProviderABC
class AuthUserDao(DbModelDaoABC[AuthUser]):
def __init__(self):
DbModelDaoABC.__init__(self, AuthUser, TableManager.get("auth_users"))
self.attribute(AuthUser.keycloak_id, str, db_name="keycloakId")
async def get_users():
return [(x.id, x.username, x.email) for x in await self.get_all()]
self.use_external_fields(
ExternalDataTempTableBuilder()
.with_table_name(self._table_name)
.with_field("id", "int", True)
.with_field("username", "text")
.with_field("email", "text")
.with_value_getter(get_users)
)
async def get_by_keycloak_id(self, keycloak_id: str) -> AuthUser:
return await self.get_single_by({AuthUser.keycloak_id: keycloak_id})
async def find_by_keycloak_id(self, keycloak_id: str) -> Optional[AuthUser]:
return await self.find_single_by({AuthUser.keycloak_id: keycloak_id})
async def has_permission(self, user_id: int, permission: Union[Permissions, str]) -> bool:
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
p = await permission_dao.get_by_name(permission if isinstance(permission, str) else permission.value)
result = await self._db.select_map(
f"""
SELECT COUNT(*) as count
FROM {TableManager.get("role_users")} ru
JOIN {TableManager.get("role_permissions")} rp ON ru.roleId = rp.roleId
WHERE ru.userId = {user_id}
AND rp.permissionId = {p.id}
AND ru.deleted = FALSE
AND rp.deleted = FALSE;
"""
)
if result is None or len(result) == 0:
return False
return result[0]["count"] > 0
async def get_permissions(self, user_id: int) -> list[Permissions]:
result = await self._db.select_map(
f"""
SELECT p.*
FROM {TableManager.get("permissions")} p
JOIN {TableManager.get("role_permissions")} rp ON p.id = rp.permissionId
JOIN {TableManager.get("role_users")} ru ON rp.roleId = ru.roleId
WHERE ru.userId = {user_id}
AND rp.deleted = FALSE
AND ru.deleted = FALSE;
"""
)
return [Permissions(p["name"]) for p in result]

View File

@@ -1,46 +0,0 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import ServiceProviderABC
class ApiKeyPermission(DbJoinModelABC):
def __init__(
self,
id: SerialId,
api_key_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbJoinModelABC.__init__(self, api_key_id, permission_id, id, deleted, editor_id, created, updated)
self._api_key_id = api_key_id
self._permission_id = permission_id
@property
def api_key_id(self) -> int:
return self._api_key_id
@async_property
async def api_key(self):
from cpl.auth.schema._administration.api_key_dao import ApiKeyDao
api_key_dao: ApiKeyDao = ServiceProviderABC.get_global_service(ApiKeyDao)
return await api_key_dao.get_by_id(self._api_key_id)
@property
def permission_id(self) -> int:
return self._permission_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
return await permission_dao.get_by_id(self._permission_id)

View File

@@ -1,26 +0,0 @@
from cpl.auth.schema._permission.api_key_permission import ApiKeyPermission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
class ApiKeyPermissionDao(DbModelDaoABC[ApiKeyPermission]):
def __init__(self):
DbModelDaoABC.__init__(self, ApiKeyPermission, TableManager.get("api_key_permissions"))
self.attribute(ApiKeyPermission.api_key_id, int)
self.attribute(ApiKeyPermission.permission_id, int)
async def find_by_api_key_id(self, api_key_id: int, with_deleted=False) -> list[ApiKeyPermission]:
f = [{ApiKeyPermission.api_key_id: api_key_id}]
if not with_deleted:
f.append({ApiKeyPermission.deleted: False})
return await self.find_by(f)
async def find_by_permission_id(self, permission_id: int, with_deleted=False) -> list[ApiKeyPermission]:
f = [{ApiKeyPermission.permission_id: permission_id}]
if not with_deleted:
f.append({ApiKeyPermission.deleted: False})
return await self.find_by(f)

View File

@@ -1,37 +0,0 @@
from datetime import datetime
from typing import Optional
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class Permission(DbModelABC):
def __init__(
self,
id: SerialId,
name: str,
description: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._description = description
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, value: str):
self._description = value

View File

@@ -1,18 +0,0 @@
from typing import Optional
from cpl.auth.schema._permission.permission import Permission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
class PermissionDao(DbModelDaoABC[Permission]):
def __init__(self):
DbModelDaoABC.__init__(self, Permission, TableManager.get("permissions"))
self.attribute(Permission.name, str)
self.attribute(Permission.description, Optional[str])
async def get_by_name(self, name: str) -> Permission:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Name = '{name}'")
return self.to_object(result[0])

View File

@@ -1,66 +0,0 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
class Role(DbModelABC):
def __init__(
self,
id: SerialId,
name: str,
description: str,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
self._description = description
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, value: str):
self._description = value
@async_property
async def permissions(self):
from cpl.auth.schema._permission.role_permission_dao import RolePermissionDao
role_permission_dao: RolePermissionDao = ServiceProviderABC.get_global_service(RolePermissionDao)
return [await x.permission for x in await role_permission_dao.get_by_role_id(self.id)]
@async_property
async def users(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = ServiceProviderABC.get_global_service(RoleUserDao)
return [await x.user for x in await role_user_dao.get_by_role_id(self.id)]
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._permission.permission_dao import PermissionDao
from cpl.auth.schema._permission.role_permission_dao import RolePermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
role_permission_dao: RolePermissionDao = ServiceProviderABC.get_global_service(RolePermissionDao)
p = await permission_dao.get_by_name(permission.value)
return p.id in [x.id for x in await role_permission_dao.get_by_role_id(self.id)]

View File

@@ -1,14 +0,0 @@
from cpl.auth.schema._permission.role import Role
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
class RoleDao(DbModelDaoABC[Role]):
def __init__(self):
DbModelDaoABC.__init__(self, Role, TableManager.get("roles"))
self.attribute(Role.name, str)
self.attribute(Role.description, str)
async def get_by_name(self, name: str) -> Role:
result = await self._db.select_map(f"SELECT * FROM {self._table_name} WHERE Name = '{name}'")
return self.to_object(result[0])

View File

@@ -1,46 +0,0 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProviderABC
class RolePermission(DbModelABC):
def __init__(
self,
id: SerialId,
role_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._role_id = role_id
self._permission_id = permission_id
@property
def role_id(self) -> int:
return self._role_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = ServiceProviderABC.get_global_service(RoleDao)
return await role_dao.get_by_id(self._role_id)
@property
def permission_id(self) -> int:
return self._permission_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = ServiceProviderABC.get_global_service(PermissionDao)
return await permission_dao.get_by_id(self._permission_id)

View File

@@ -1,26 +0,0 @@
from cpl.auth.schema._permission.role_permission import RolePermission
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
class RolePermissionDao(DbModelDaoABC[RolePermission]):
def __init__(self):
DbModelDaoABC.__init__(self, RolePermission, TableManager.get("role_permissions"))
self.attribute(RolePermission.role_id, int)
self.attribute(RolePermission.permission_id, int)
async def get_by_role_id(self, role_id: int, with_deleted=False) -> list[RolePermission]:
f = [{RolePermission.role_id: role_id}]
if not with_deleted:
f.append({RolePermission.deleted: False})
return await self.find_by(f)
async def get_by_permission_id(self, permission_id: int, with_deleted=False) -> list[RolePermission]:
f = [{RolePermission.permission_id: permission_id}]
if not with_deleted:
f.append({RolePermission.deleted: False})
return await self.find_by(f)

View File

@@ -1,46 +0,0 @@
from datetime import datetime
from typing import Optional
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import ServiceProviderABC
class RoleUser(DbJoinModelABC):
def __init__(
self,
id: SerialId,
user_id: SerialId,
role_id: SerialId,
deleted: bool = False,
editor_id: Optional[SerialId] = None,
created: Optional[datetime] = None,
updated: Optional[datetime] = None,
):
DbJoinModelABC.__init__(self, id, user_id, role_id, deleted, editor_id, created, updated)
self._user_id = user_id
self._role_id = role_id
@property
def user_id(self) -> int:
return self._user_id
@async_property
async def user(self):
from cpl.auth.schema._administration.auth_user_dao import AuthUserDao
auth_user_dao: AuthUserDao = ServiceProviderABC.get_global_service(AuthUserDao)
return await auth_user_dao.get_by_id(self._user_id)
@property
def role_id(self) -> int:
return self._role_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = ServiceProviderABC.get_global_service(RoleDao)
return await role_dao.get_by_id(self._role_id)

View File

@@ -1,26 +0,0 @@
from cpl.auth.schema._permission.role_user import RoleUser
from cpl.database import TableManager
from cpl.database.abc import DbModelDaoABC
class RoleUserDao(DbModelDaoABC[RoleUser]):
def __init__(self):
DbModelDaoABC.__init__(self, RoleUser, TableManager.get("role_users"))
self.attribute(RoleUser.role_id, int)
self.attribute(RoleUser.user_id, int)
async def get_by_role_id(self, rid: int, with_deleted=False) -> list[RoleUser]:
f = [{RoleUser.role_id: rid}]
if not with_deleted:
f.append({RoleUser.deleted: False})
return await self.find_by(f)
async def get_by_user_id(self, uid: int, with_deleted=False) -> list[RoleUser]:
f = [{RoleUser.user_id: uid}]
if not with_deleted:
f.append({RoleUser.deleted: False})
return await self.find_by(f)

Some files were not shown because too many files have changed in this diff Show More