Compare commits

..

1 Commits

Author SHA1 Message Date
b2344a8065 Fixed api build
Some checks failed
Build on push / prepare (push) Successful in 8s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 17s
Build on push / dependency (push) Successful in 17s
Build on push / application (push) Successful in 15s
Build on push / translation (push) Successful in 15s
Build on push / database (push) Successful in 18s
Build on push / mail (push) Successful in 18s
Build on push / auth (push) Successful in 17s
Build on push / api (push) Failing after 14s
2025-09-19 21:07:00 +02:00
520 changed files with 3184 additions and 10219 deletions

View File

@@ -16,68 +16,61 @@ jobs:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, application, auth, core, dependency ]
with:
working_directory: src/api
working_directory: src/cpl-api
secrets: inherit
application:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/application
working_directory: src/cpl-application
secrets: inherit
auth:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency, database ]
with:
working_directory: src/auth
secrets: inherit
cli:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cli
working_directory: src/cpl-auth
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/core
working_directory: src/cpl-core
secrets: inherit
database:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/database
working_directory: src/cpl-database
secrets: inherit
dependency:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/dependency
working_directory: src/cpl-dependency
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/mail
working_directory: src/cpl-mail
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/query
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/translation
working_directory: src/cpl-translation
secrets: inherit

View File

@@ -36,12 +36,6 @@ jobs:
echo "Set version to $(cat /workspace/sh-edraft.de/cpl/version.txt)"
cat pyproject.toml
- name: Set package version
run: |
sed -i -E "s/^__version__ = \".*\"/__version__ = \"$(cat /workspace/sh-edraft.de/cpl/version.txt)\"/" cpl/*/__init__.py
echo "Set version to $(cat /workspace/sh-edraft.de/cpl/version.txt)"
cat cpl/*/__init__.py
- name: Set pip conf
run: |
cat > .pip.conf <<'EOF'

View File

@@ -25,11 +25,7 @@ jobs:
git tag
DATE=$(date +'%Y.%m.%d')
TAG_COUNT=$(git tag -l "${DATE}.*" | wc -l)
if [ "$TAG_COUNT" -eq 0 ]; then
BUILD_NUMBER=0
else
BUILD_NUMBER=$(($TAG_COUNT + 1))
fi
BUILD_NUMBER=$(($TAG_COUNT + 1))
VERSION_SUFFIX=${{ inputs.version_suffix }}
if [ -n "$VERSION_SUFFIX" ] && [ "$VERSION_SUFFIX" = "dev" ]; then

View File

@@ -1,26 +0,0 @@
name: Test before pr merge
run-name: Test before pr merge
on:
pull_request:
types:
- opened
- edited
- reopened
- synchronize
- ready_for_review
jobs:
test-lint:
runs-on: [ runner ]
container: git.sh-edraft.de/sh-edraft.de/act-runner:latest
steps:
- name: Clone Repository
uses: https://github.com/actions/checkout@v3
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Installing black
run: python3.12 -m pip install black
- name: Checking black
run: python3.12 -m black src --check

3
.gitignore vendored
View File

@@ -139,6 +139,3 @@ PythonImportHelper-v2-Completion.json
# cpl unittest stuff
unittests/test_*_playground
# cpl logs
**/logs/*.jsonl

View File

@@ -1,22 +0,0 @@
## Prepare for development
After cloning the repository, run the following commands to set up your development environment:
```bash
python -m venv .venv
source .venv/bin/activate
# On Windows use `.venv\Scripts\activate`
# On Windows with git bash use `source .venv/Scripts/activate`
bash install.sh
```
Install cpl-cli as a development package:
```bash
pip install -e src/core
pip install -e src/cli
# test with:
cpl v
```
When using Pycharm, mark all directories under `src/` as "Sources Root" and `exa` to ensure proper module resolution.

View File

@@ -1,12 +0,0 @@
{
"name": "cpl",
"projects": [
"src/cli/cpl.project.json",
"src/core/cpl.project.json",
"test/cpl.project.json"
],
"defaultProject": "cpl-cli",
"scripts": {
"format": "black src"
}
}

View File

@@ -1,147 +0,0 @@
from starlette.responses import JSONResponse
from cpl.dependency.event_bus import EventBusABC
from cpl.graphql.event_bus.memory import InMemoryEventBus
from queries.cities import CityGraphType, CityFilter, CitySort
from queries.hello import UserGraphType # , UserFilter, UserSort, UserGraphType
from queries.user import UserFilter, UserSort
from cpl.api.api_module import ApiModule
from cpl.application.application_builder import ApplicationBuilder
from cpl.auth.schema import User, Role
from cpl.core.configuration import Configuration
from cpl.core.console import Console
from cpl.core.environment import Environment
from cpl.core.utils.cache import Cache
from cpl.database.mysql.mysql_module import MySQLModule
from cpl.graphql.application.graphql_app import GraphQLApp
from cpl.graphql.auth.graphql_auth_module import GraphQLAuthModule
from cpl.graphql.graphql_module import GraphQLModule
from model.author_dao import AuthorDao
from model.author_query import AuthorGraphType, AuthorFilter, AuthorSort
from model.post_dao import PostDao
from model.post_query import PostFilter, PostSort, PostGraphType, PostMutation, PostSubscription
from permissions import PostPermissions
from queries.hello import HelloQuery
from scoped_service import ScopedService
from service import PingService
from test_data_seeder import TestDataSeeder
def main():
builder = ApplicationBuilder[GraphQLApp](GraphQLApp)
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
# builder.services.add_logging()
(
builder.services.add_structured_logging()
.add_transient(PingService)
.add_module(MySQLModule)
.add_module(ApiModule)
.add_module(GraphQLModule)
.add_module(GraphQLAuthModule)
.add_scoped(ScopedService)
.add_singleton(EventBusABC, InMemoryEventBus)
.add_cache(User)
.add_cache(Role)
.add_transient(CityGraphType)
.add_transient(CityFilter)
.add_transient(CitySort)
.add_transient(UserGraphType)
.add_transient(UserFilter)
.add_transient(UserSort)
# .add_transient(UserGraphType)
# .add_transient(UserFilter)
# .add_transient(UserSort)
.add_transient(HelloQuery)
# test data
.add_singleton(TestDataSeeder)
# authors
.add_transient(AuthorDao)
.add_transient(AuthorGraphType)
.add_transient(AuthorFilter)
.add_transient(AuthorSort)
# posts
.add_transient(PostDao)
.add_transient(PostGraphType)
.add_transient(PostFilter)
.add_transient(PostSort)
.add_transient(PostMutation)
.add_transient(PostSubscription)
)
app = builder.build()
app.with_logging()
app.with_migrations("./scripts")
app.with_authentication()
app.with_authorization()
app.with_route(
path="/route1",
fn=lambda r: JSONResponse("route1"),
method="GET",
# authentication=True,
# permissions=[Permissions.administrator],
)
app.with_routes_directory("routes")
schema = app.with_graphql()
schema.query.string_field("ping", resolver=lambda: "pong")
schema.query.with_query("hello", HelloQuery)
schema.query.dao_collection_field(AuthorGraphType, AuthorDao, "authors", AuthorFilter, AuthorSort)
(
schema.query.dao_collection_field(PostGraphType, PostDao, "posts", PostFilter, PostSort)
# .with_require_any_permission(PostPermissions.read)
.with_public()
)
schema.mutation.with_mutation("post", PostMutation).with_public()
schema.subscription.with_subscription(PostSubscription)
app.with_auth_root_queries(True)
app.with_auth_root_mutations(True)
app.with_playground()
app.with_graphiql()
app.with_permissions(PostPermissions)
provider = builder.service_provider
user_cache = provider.get_service(Cache[User])
role_cache = provider.get_service(Cache[Role])
if role_cache == user_cache:
raise Exception("Cache service is not working")
s1 = provider.get_service(ScopedService)
s2 = provider.get_service(ScopedService)
if s1.name == s2.name:
raise Exception("Scoped service is not working")
with provider.create_scope() as scope:
s3 = scope.get_service(ScopedService)
s4 = scope.get_service(ScopedService)
if s3.name != s4.name:
raise Exception("Scoped service is not working")
if s1.name == s3.name:
raise Exception("Scoped service is not working")
Console.write_line(
s1.name,
s2.name,
s3.name,
s4.name,
)
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,30 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class Author(DbModelABC[Self]):
def __init__(
self,
id: int,
first_name: str,
last_name: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._first_name = first_name
self._last_name = last_name
@property
def first_name(self) -> str:
return self._first_name
@property
def last_name(self) -> str:
return self._last_name

View File

@@ -1,11 +0,0 @@
from cpl.database.abc import DbModelDaoABC
from model.author import Author
class AuthorDao(DbModelDaoABC):
def __init__(self):
DbModelDaoABC.__init__(self, Author, "authors")
self.attribute(Author.first_name, str, db_name="firstname")
self.attribute(Author.last_name, str, db_name="lastname")

View File

@@ -1,37 +0,0 @@
from cpl.graphql.schema.db_model_graph_type import DbModelGraphType
from cpl.graphql.schema.filter.db_model_filter import DbModelFilter
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
from model.author import Author
class AuthorFilter(DbModelFilter[Author]):
def __init__(self):
DbModelFilter.__init__(self, public=True)
self.int_field("id")
self.string_field("firstName")
self.string_field("lastName")
class AuthorSort(Sort[Author]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("firstName", SortOrder)
self.field("lastName", SortOrder)
class AuthorGraphType(DbModelGraphType[Author]):
def __init__(self):
DbModelGraphType.__init__(self, public=True)
self.int_field(
"id",
resolver=lambda root: root.id,
).with_public(True)
self.string_field(
"firstName",
resolver=lambda root: root.first_name,
).with_public(True)
self.string_field(
"lastName",
resolver=lambda root: root.last_name,
).with_public(True)

View File

@@ -1,44 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class Post(DbModelABC[Self]):
def __init__(
self,
id: int,
author_id: SerialId,
title: str,
content: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._author_id = author_id
self._title = title
self._content = content
@property
def author_id(self) -> SerialId:
return self._author_id
@property
def title(self) -> str:
return self._title
@title.setter
def title(self, value: str):
self._title = value
@property
def content(self) -> str:
return self._content
@content.setter
def content(self, value: str):
self._content = value

View File

@@ -1,15 +0,0 @@
from cpl.database.abc import DbModelDaoABC
from model.author_dao import AuthorDao
from model.post import Post
class PostDao(DbModelDaoABC[Post]):
def __init__(self, authors: AuthorDao):
DbModelDaoABC.__init__(self, Post, "posts")
self.attribute(Post.author_id, int, db_name="authorId")
self.reference("author", "id", Post.author_id, "authors", authors)
self.attribute(Post.title, str)
self.attribute(Post.content, str)

View File

@@ -1,148 +0,0 @@
from cpl.dependency.event_bus import EventBusABC
from cpl.graphql.query_context import QueryContext
from cpl.graphql.schema.db_model_graph_type import DbModelGraphType
from cpl.graphql.schema.filter.db_model_filter import DbModelFilter
from cpl.graphql.schema.input import Input
from cpl.graphql.schema.mutation import Mutation
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
from cpl.graphql.schema.subscription import Subscription
from model.author_dao import AuthorDao
from model.author_query import AuthorGraphType, AuthorFilter
from model.post import Post
from model.post_dao import PostDao
class PostFilter(DbModelFilter[Post]):
def __init__(self):
DbModelFilter.__init__(self, public=True)
self.int_field("id")
self.filter_field("author", AuthorFilter)
self.string_field("title")
self.string_field("content")
class PostSort(Sort[Post]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("title", SortOrder)
self.field("content", SortOrder)
class PostGraphType(DbModelGraphType[Post]):
def __init__(self, authors: AuthorDao):
DbModelGraphType.__init__(self, public=True)
self.int_field(
"id",
resolver=lambda root: root.id,
).with_optional().with_public(True)
async def _a(root: Post):
return await authors.get_by_id(root.author_id)
def r_name(ctx: QueryContext):
return ctx.user.username == "admin"
self.object_field("author", AuthorGraphType, resolver=_a).with_public(True) # .with_require_any([], [r_name]))
self.string_field(
"title",
resolver=lambda root: root.title,
).with_public(True)
self.string_field(
"content",
resolver=lambda root: root.content,
).with_public(True)
class PostCreateInput(Input[Post]):
title: str
content: str
author_id: int
def __init__(self):
Input.__init__(self)
self.string_field("title").with_required()
self.string_field("content").with_required()
self.int_field("author_id").with_required()
class PostUpdateInput(Input[Post]):
title: str
content: str
author_id: int
def __init__(self):
Input.__init__(self)
self.int_field("id").with_required()
self.string_field("title").with_required(False)
self.string_field("content").with_required(False)
class PostSubscription(Subscription):
def __init__(self, bus: EventBusABC):
Subscription.__init__(self)
self._bus = bus
def selector(event: Post, info) -> bool:
return event.id == 101
self.subscription_field("postChange", PostGraphType, selector).with_public()
class PostMutation(Mutation):
def __init__(self, posts: PostDao, authors: AuthorDao, bus: EventBusABC):
Mutation.__init__(self)
self._posts = posts
self._authors = authors
self._bus = bus
self.field("create", int, resolver=self.create_post).with_public().with_required().with_argument(
"input",
PostCreateInput,
).with_required()
self.field("update", bool, resolver=self.update_post).with_public().with_required().with_argument(
"input",
PostUpdateInput,
).with_required()
self.field("delete", bool, resolver=self.delete_post).with_public().with_required().with_argument(
"id",
int,
).with_required()
self.field("restore", bool, resolver=self.restore_post).with_public().with_required().with_argument(
"id",
int,
).with_required()
async def create_post(self, input: PostCreateInput) -> int:
return await self._posts.create(Post(0, input.author_id, input.title, input.content))
async def update_post(self, input: PostUpdateInput) -> bool:
post = await self._posts.get_by_id(input.id)
if post is None:
return False
post.title = input.title if input.title is not None else post.title
post.content = input.content if input.content is not None else post.content
await self._posts.update(post)
await self._bus.publish("postChange", post)
return True
async def delete_post(self, id: int) -> bool:
post = await self._posts.get_by_id(id)
if post is None:
return False
await self._posts.delete(post)
return True
async def restore_post(self, id: int) -> bool:
post = await self._posts.get_by_id(id)
if post is None:
return False
await self._posts.restore(post)
return True

View File

@@ -1,8 +0,0 @@
from enum import Enum
class PostPermissions(Enum):
read = "post.read"
write = "post.write"
delete = "post.delete"

View File

@@ -1,39 +0,0 @@
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.graph_type import GraphType
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
class City:
def __init__(self, id: int, name: str):
self.id = id
self.name = name
class CityFilter(Filter[City]):
def __init__(self):
Filter.__init__(self)
self.field("id", int)
self.field("name", str)
class CitySort(Sort[City]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("name", SortOrder)
class CityGraphType(GraphType[City]):
def __init__(self):
GraphType.__init__(self)
self.int_field(
"id",
resolver=lambda root: root.id,
)
self.string_field(
"name",
resolver=lambda root: root.name,
)

View File

@@ -1,70 +0,0 @@
from queries.cities import CityFilter, CitySort, CityGraphType, City
from queries.user import User, UserFilter, UserSort, UserGraphType
from cpl.api.middleware.request import get_request
from cpl.auth.schema import UserDao, User
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.graph_type import GraphType
from cpl.graphql.schema.query import Query
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
users = [User(i, f"User {i}") for i in range(1, 101)]
cities = [City(i, f"City {i}") for i in range(1, 101)]
# class UserFilter(Filter[User]):
# def __init__(self):
# Filter.__init__(self)
# self.field("id", int)
# self.field("username", str)
#
#
# class UserSort(Sort[User]):
# def __init__(self):
# Sort.__init__(self)
# self.field("id", SortOrder)
# self.field("username", SortOrder)
#
# class UserGraphType(GraphType[User]):
#
# def __init__(self):
# GraphType.__init__(self)
#
# self.int_field(
# "id",
# resolver=lambda root: root.id,
# )
# self.string_field(
# "username",
# resolver=lambda root: root.username,
# )
class HelloQuery(Query):
def __init__(self):
Query.__init__(self)
self.string_field(
"message",
resolver=lambda name: f"Hello {name} {get_request().state.request_id}",
).with_argument("name", str, "Name to greet", "world")
self.collection_field(
UserGraphType,
"users",
UserFilter,
UserSort,
resolver=lambda: users,
)
self.collection_field(
CityGraphType,
"cities",
CityFilter,
CitySort,
resolver=lambda: cities,
)
# self.dao_collection_field(
# UserGraphType,
# UserDao,
# "Users",
# UserFilter,
# UserSort,
# )

View File

@@ -1,39 +0,0 @@
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.graph_type import GraphType
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
class User:
def __init__(self, id: int, name: str):
self.id = id
self.name = name
class UserFilter(Filter[User]):
def __init__(self):
Filter.__init__(self)
self.field("id", int)
self.field("name", str)
class UserSort(Sort[User]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("name", SortOrder)
class UserGraphType(GraphType[User]):
def __init__(self):
GraphType.__init__(self)
self.int_field(
"id",
resolver=lambda root: root.id,
)
self.string_field(
"name",
resolver=lambda root: root.name,
)

View File

@@ -1,21 +0,0 @@
from urllib.request import Request
from service import PingService
from starlette.responses import JSONResponse
from cpl.api import APILogger
from cpl.api.router import Router
from cpl.core.console import Console
from cpl.dependency import ServiceProvider
from scoped_service import ScopedService
@Router.authenticate()
# @Router.authorize(permissions=[Permissions.administrator])
# @Router.authorize(policies=["test"])
@Router.get(f"/ping")
async def ping(r: Request, ping: PingService, logger: APILogger, provider: ServiceProvider, scoped: ScopedService):
logger.info(f"Ping: {ping}")
Console.write_line(scoped.name)
return JSONResponse(ping.ping(r))

View File

@@ -1,14 +0,0 @@
from cpl.core.console.console import Console
from cpl.core.utils.string import String
class ScopedService:
def __init__(self):
self._name = String.random(8)
@property
def name(self) -> str:
return self._name
def run(self):
Console.write_line(f"Im {self._name}")

View File

@@ -1,22 +0,0 @@
CREATE TABLE IF NOT EXISTS `authors` (
`id` INT(30) NOT NULL AUTO_INCREMENT,
`firstname` VARCHAR(64) NOT NULL,
`lastname` VARCHAR(64) NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`)
);
CREATE TABLE IF NOT EXISTS `posts` (
`id` INT(30) NOT NULL AUTO_INCREMENT,
`authorId` INT(30) NOT NULL REFERENCES `authors`(`id`) ON DELETE CASCADE,
`title` TEXT NOT NULL,
`content` TEXT NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`)
);

View File

@@ -1,48 +0,0 @@
from faker import Faker
from cpl.database.abc import DataSeederABC
from cpl.query import Enumerable
from model.author import Author
from model.author_dao import AuthorDao
from model.post import Post
from model.post_dao import PostDao
fake = Faker()
class TestDataSeeder(DataSeederABC):
def __init__(self, authors: AuthorDao, posts: PostDao):
DataSeederABC.__init__(self)
self._authors = authors
self._posts = posts
async def seed(self):
if await self._authors.count() == 0:
await self._seed_authors()
if await self._posts.count() == 0:
await self._seed_posts()
async def _seed_authors(self):
authors = Enumerable.range(0, 35).select(
lambda x: Author(
0,
fake.first_name(),
fake.last_name(),
)
).to_list()
await self._authors.create_many(authors, skip_editor=True)
async def _seed_posts(self):
posts = Enumerable.range(0, 100).select(
lambda x: Post(
id=0,
author_id=fake.random_int(min=1, max=35),
title=fake.sentence(nb_words=6),
content=fake.paragraph(nb_sentences=6),
)
).to_list()
await self._posts.create_many(posts, skip_editor=True)

View File

@@ -1,26 +0,0 @@
{
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"Log": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "TRACE",
"Level": "TRACE"
},
"Database": {
"Host": "localhost",
"User": "cpl",
"Port": 3306,
"Password": "cpl",
"Database": "cpl",
"Charset": "utf8mb4",
"UseUnicode": "true",
"Buffered": "true"
}
}

View File

@@ -1,15 +0,0 @@
{
"TimeFormat": {
"DateFormat": "%Y-%m-%d",
"TimeFormat": "%H:%M:%S",
"DateTimeFormat": "%Y-%m-%d %H:%M:%S.%f",
"DateTimeLogFormat": "%Y-%m-%d_%H-%M-%S"
},
"Log": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "ERROR",
"Level": "WARNING"
}
}

View File

@@ -1,45 +0,0 @@
from cpl.application.abc import ApplicationABC
from cpl.core.console.console import Console
from cpl.dependency import ServiceProvider
from test_abc import TestABC
from test_service import TestService
from di_tester_service import DITesterService
from tester import Tester
class Application(ApplicationABC):
def __init__(self, services: ServiceProvider):
ApplicationABC.__init__(self, services)
def _part_of_scoped(self):
ts: TestService = self._services.get_service(TestService)
ts.run()
def main(self):
with self._services.create_scope() as scope:
Console.write_line("Scope1")
ts: TestService = scope.get_service(TestService)
ts.run()
dit: DITesterService = scope.get_service(DITesterService)
dit.run()
if ts.name != dit.name:
raise Exception("DI is broken!")
with self._services.create_scope() as scope:
Console.write_line("Scope2")
ts: TestService = scope.get_service(TestService)
ts.run()
dit: DITesterService = scope.get_service(DITesterService)
dit.run()
if ts.name != dit.name:
raise Exception("DI is broken!")
Console.write_line("Global")
self._part_of_scoped()
#from static_test import StaticTest
#StaticTest.test()
self._services.get_service(Tester)
Console.write_line(self._services.get_services(TestABC))

View File

@@ -1,27 +0,0 @@
from cpl.application.abc import StartupABC
from cpl.dependency import ServiceProvider, ServiceCollection
from di_tester_service import DITesterService
from test1_service import Test1Service
from test2_service import Test2Service
from test_abc import TestABC
from test_service import TestService
from tester import Tester
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
@staticmethod
def configure_configuration(): ...
@staticmethod
def configure_services(services: ServiceCollection) -> ServiceProvider:
services.add_scoped(TestService)
services.add_scoped(DITesterService)
services.add_singleton(TestABC, Test1Service)
services.add_singleton(TestABC, Test2Service)
services.add_singleton(Tester)
return services.build()

View File

@@ -1,10 +0,0 @@
from cpl.dependency import ServiceProvider, ServiceProvider
from cpl.dependency.inject import inject
from test_service import TestService
class StaticTest:
@staticmethod
@inject
def test(services: ServiceProvider, t1: TestService):
t1.run()

View File

@@ -1,7 +0,0 @@
from cpl.core.console.console import Console
from test_abc import TestABC
class Tester:
def __init__(self, t1: TestABC, t2: TestABC, t3: TestABC, t: list[TestABC]):
Console.write_line("Tester:", t, t1, t2, t3)

View File

@@ -1,8 +0,0 @@
{
"Logging": {
"Path": "logs/",
"Filename": "log_$start_time.log",
"ConsoleLevel": "TRACE",
"Level": "TRACE"
}
}

View File

@@ -1,30 +0,0 @@
import asyncio
from datetime import datetime
from cpl.core.console import Console
from cpl.core.time.cron import Cron
from cpl.core.service.cronjob import CronjobABC
from cpl.core.service.hosted_service import HostedService
class Hosted(HostedService):
def __init__(self):
self._stopped = False
async def start(self):
Console.write_line("Hosted Service Started")
while not self._stopped:
Console.write_line("Hosted Service Running")
await asyncio.sleep(5)
async def stop(self):
Console.write_line("Hosted Service Stopped")
self._stopped = True
class MyCronJob(CronjobABC):
def __init__(self):
CronjobABC.__init__(self, Cron("*/1 * * * *")) # Every minute
async def loop(self):
Console.write_line(f"[{datetime.now()}] Hello from Cronjob!")

View File

@@ -1,10 +0,0 @@
from cpl.core.console import Console
class ScopedService:
def __init__(self):
self.value = "I am a scoped service"
Console.write_line(self.value, self)
def get_value(self):
return self.value

View File

@@ -1,60 +0,0 @@
from cpl.core.console import Console
from cpl.core.utils.benchmark import Benchmark
from cpl.query.enumerable import Enumerable
from cpl.query.immutable_list import ImmutableList
from cpl.query.list import List
from cpl.query.set import Set
def _default():
Console.write_line(Enumerable.empty().to_list())
Console.write_line(Enumerable.range(0, 100).length)
Console.write_line(Enumerable.range(0, 100).to_list())
Console.write_line(Enumerable.range(0, 100).where(lambda x: x % 2 == 0).length)
Console.write_line(
Enumerable.range(0, 100).where(lambda x: x % 2 == 0).to_list().select(lambda x: str(x)).to_list()
)
Console.write_line(List)
s =Enumerable.range(0, 10).to_set()
Console.write_line(s)
s.add(1)
Console.write_line(s)
data = Enumerable(
[
{"name": "Alice", "age": 30},
{"name": "Dave", "age": 35},
{"name": "Charlie", "age": 25},
{"name": "Bob", "age": 25},
]
)
Console.write_line(data.order_by(lambda x: x["age"]).to_list())
Console.write_line(data.order_by(lambda x: x["age"]).then_by(lambda x: x["name"]).to_list())
Console.write_line(data.order_by(lambda x: x["name"]).then_by(lambda x: x["age"]).to_list())
def t_benchmark(data: list):
Benchmark.all("Enumerable", lambda: Enumerable(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Set", lambda: Set(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("List", lambda: List(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all(
"ImmutableList", lambda: ImmutableList(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()
)
Benchmark.all("List comprehension", lambda: [x * 2 for x in data if x % 2 == 0])
def main():
N = 1_000_000
data = list(range(N))
t_benchmark(data)
Console.write_line()
_default()
if __name__ == "__main__":
main()

View File

@@ -1,61 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# Find and combine requirements from src/cpl-*/requirements.txt,
# filtering out lines whose *package name* starts with "cpl-".
# Works with pinned versions, extras, markers, editable installs, and VCS refs.
shopt -s nullglob
req_files=(src/cpl-*/requirements.txt)
if ((${#req_files[@]} == 0)); then
echo "No requirements files found at src/cpl-*/requirements.txt" >&2
exit 1
fi
tmp_combined="$(mktemp)"
trap 'rm -f "$tmp_combined"' EXIT
# Concatenate, trim comments/whitespace, filter out cpl-* packages, dedupe.
# We keep non-package options/flags/constraints as-is.
awk '
function trim(s){ sub(/^[[:space:]]+/,"",s); sub(/[[:space:]]+$/,"",s); return s }
{
line=$0
# drop full-line comments and strip inline comments
if (line ~ /^[[:space:]]*#/) next
sub(/#[^!].*$/,"",line) # strip trailing comment (simple heuristic)
line=trim(line)
if (line == "") next
# Determine the package *name* even for "-e", extras, pins, markers, or VCS "@"
e = line
sub(/^-e[[:space:]]+/,"",e) # remove editable prefix
# Tokenize up to the first of these separators: space, [ < > = ! ~ ; @
token = e
sub(/\[.*/,"",token) # remove extras quickly
n = split(token, a, /[<>=!~;@[:space:]]/)
name = tolower(a[1])
# If the first token (name) starts with "cpl-", skip this requirement
if (name ~ /^cpl-/) next
print line
}
' "${req_files[@]}" | sort -u > "$tmp_combined"
if ! [ -s "$tmp_combined" ]; then
echo "Nothing to install after filtering out cpl-* packages." >&2
exit 0
fi
echo "Installing dependencies (excluding cpl-*) from:"
printf ' - %s\n' "${req_files[@]}"
echo
echo "Final set to install:"
cat "$tmp_combined"
echo
# Use python -m pip for reliability; change to python3 if needed.
python -m pip install -r "$tmp_combined"

View File

@@ -1,6 +0,0 @@
from .error import APIError, AlreadyExists, EndpointNotImplemented, Forbidden, NotFound, Unauthorized
from .logger import APILogger
from .settings import ApiSettings
from .api_module import ApiModule
__version__ = "1.0.0"

View File

@@ -1 +0,0 @@
from .asgi_middleware_abc import ASGIMiddleware

View File

@@ -1,15 +0,0 @@
from abc import ABC, abstractmethod
from starlette.types import Scope, Receive, Send
class ASGIMiddleware(ABC):
@abstractmethod
def __init__(self, app):
self._app = app
def _call_next(self, scope: Scope, receive: Receive, send: Send):
return self._app(scope, receive, send)
@abstractmethod
async def __call__(self, scope: Scope, receive: Receive, send: Send): ...

View File

@@ -1,45 +0,0 @@
from abc import ABC
from enum import Enum
from typing import Self
from starlette.applications import Starlette
from cpl.api.model.api_route import ApiRoute
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.typing import HTTPMethods, PartialMiddleware, TEndpoint, PolicyInput
from cpl.application.abc.application_abc import ApplicationABC
from cpl.dependency.service_provider import ServiceProvider
from cpl.dependency.typing import Modules
class WebAppABC(ApplicationABC, ABC):
def __init__(self, services: ServiceProvider, modules: Modules, required_modules: list[str | object] = None):
ApplicationABC.__init__(self, services, modules, required_modules)
def with_routes_directory(self, directory: str) -> Self: ...
def with_app(self, app: Starlette) -> Self: ...
def with_routes(
self,
routes: list[ApiRoute],
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self: ...
def with_route(
self,
path: str,
fn: TEndpoint,
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self: ...
def with_middleware(self, middleware: PartialMiddleware) -> Self: ...
def with_authentication(self) -> Self: ...
def with_authorization(self, *policies: list[PolicyInput] | PolicyInput) -> Self: ...

View File

@@ -1,22 +0,0 @@
from cpl.api import ApiSettings
from cpl.api.registry.policy import PolicyRegistry
from cpl.api.registry.route import RouteRegistry
from cpl.auth.auth_module import AuthModule
from cpl.auth.permission.permission_module import PermissionsModule
from cpl.database.database_module import DatabaseModule
from cpl.dependency import ServiceCollection
from cpl.dependency.module.module import Module
class ApiModule(Module):
config = [ApiSettings]
singleton = [
PolicyRegistry,
RouteRegistry,
]
@staticmethod
def register(collection: ServiceCollection):
collection.add_module(DatabaseModule)
collection.add_module(AuthModule)
collection.add_module(PermissionsModule)

View File

@@ -1 +0,0 @@
from .web_app import WebApp

View File

@@ -1,275 +0,0 @@
import os
from enum import Enum
from typing import Mapping, Any, Self
import uvicorn
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.types import ExceptionHandler
from cpl.api.abc.web_app_abc import WebAppABC
from cpl.api.api_module import ApiModule
from cpl.api.error import APIError
from cpl.api.logger import APILogger
from cpl.api.middleware.authentication import AuthenticationMiddleware
from cpl.api.middleware.authorization import AuthorizationMiddleware
from cpl.api.middleware.logging import LoggingMiddleware
from cpl.api.middleware.request import RequestMiddleware
from cpl.api.model.api_route import ApiRoute
from cpl.api.model.policy import Policy
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.registry.policy import PolicyRegistry
from cpl.api.registry.route import RouteRegistry
from cpl.api.router import Router
from cpl.api.settings import ApiSettings
from cpl.api.typing import HTTPMethods, PartialMiddleware, TEndpoint, PolicyInput
from cpl.auth.auth_module import AuthModule
from cpl.auth.permission.permission_module import PermissionsModule
from cpl.core.configuration.configuration import Configuration
from cpl.dependency.inject import inject
from cpl.dependency.service_provider import ServiceProvider
from cpl.dependency.typing import Modules
class WebApp(WebAppABC):
def __init__(self, services: ServiceProvider, modules: Modules, required_modules: list[str | object] = None):
WebAppABC.__init__(
self, services, modules, [AuthModule, PermissionsModule, ApiModule] + (required_modules or [])
)
self._app: Starlette | None = None
self._logger = services.get_service(APILogger)
self._api_settings = Configuration.get(ApiSettings)
self._policies = services.get_service(PolicyRegistry)
self._routes = services.get_service(RouteRegistry)
self._middleware: list[Middleware] = []
self._exception_handlers: Mapping[Any, ExceptionHandler] = {
Exception: self._handle_exception,
APIError: self._handle_exception,
}
self.with_middleware(RequestMiddleware)
self.with_middleware(LoggingMiddleware)
async def _handle_exception(self, request: Request, exc: Exception):
if isinstance(exc, APIError):
self._logger.error(exc)
return JSONResponse({"error": str(exc)}, status_code=exc.status_code)
if hasattr(request.state, "request_id"):
self._logger.error(f"Request {request.state.request_id}", exc)
else:
self._logger.error("Request unknown", exc)
return JSONResponse({"error": str(exc)}, status_code=500)
def _get_allowed_origins(self):
origins = self._api_settings.allowed_origins
if origins is None or origins == "":
self._logger.warning("No allowed origins specified, allowing all origins")
return ["*"]
self._logger.debug(f"Allowed origins: {origins}")
return origins.split(",")
def _check_for_app(self):
if self._app is not None:
raise ValueError("App is already set, cannot add routes or middleware")
def _validate_policies(self):
for rule in Router.get_authorization_rules():
for policy_name in rule["policies"]:
policy = self._policies.get(policy_name)
if not policy:
self._logger.fatal(f"Authorization policy '{policy_name}' not found")
def with_routes_directory(self, directory: str) -> Self:
self._check_for_app()
assert directory is not None, "directory must not be None"
base = directory.replace("/", ".").replace("\\", ".")
for filename in os.listdir(directory):
if not filename.endswith(".py") or filename == "__init__.py":
continue
__import__(f"{base}.{filename[:-3]}")
return self
def with_app(self, app: Starlette) -> Self:
assert app is not None, "app must not be None"
assert isinstance(app, Starlette), "app must be an instance of Starlette"
self._app = app
return self
def with_routes(
self,
routes: list[ApiRoute],
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self:
self._check_for_app()
assert self._routes is not None, "routes must not be None"
assert all(isinstance(route, ApiRoute) for route in routes), "all routes must be of type ApiRoute"
for route in routes:
self.with_route(
route.path,
route.fn,
method,
authentication,
roles,
permissions,
policies,
match,
)
return self
def with_route(
self,
path: str,
fn: TEndpoint,
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self:
self._check_for_app()
assert path is not None, "path must not be None"
assert fn is not None, "fn must not be None"
assert method in [
"GET",
"HEAD",
"POST",
"PUT",
"PATCH",
"DELETE",
"OPTIONS",
], "method must be a valid HTTP method"
Router.route(path, method, registry=self._routes)(fn)
if authentication:
Router.authenticate()(fn)
if roles or permissions or policies:
Router.authorize(roles, permissions, policies, match)(fn)
return self
def with_websocket(
self,
path: str,
fn: TEndpoint,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self:
self._check_for_app()
assert path is not None, "path must not be None"
assert fn is not None, "fn must not be None"
Router.websocket(path, registry=self._routes)(fn)
if authentication:
Router.authenticate()(fn)
if roles or permissions or policies:
Router.authorize(roles, permissions, policies, match)(fn)
return self
def with_middleware(self, middleware: PartialMiddleware) -> Self:
self._check_for_app()
if isinstance(middleware, Middleware):
self._middleware.append(inject(middleware))
elif callable(middleware):
self._middleware.append(Middleware(inject(middleware)))
else:
raise ValueError("middleware must be of type starlette.middleware.Middleware or a callable")
return self
def with_authentication(self) -> Self:
self.with_middleware(AuthenticationMiddleware)
return self
def with_authorization(self, *policies: list[PolicyInput] | PolicyInput) -> Self:
self._check_for_app()
if policies:
_policies = []
if not isinstance(policies, list):
policies = list(policies)
for i, policy in enumerate(policies):
if isinstance(policy, dict):
for name, resolver in policy.items():
if not isinstance(name, str):
self._logger.warning(f"Skipping policy at index {i}, name must be a string")
continue
if not callable(resolver):
self._logger.warning(f"Skipping policy {name}, resolver must be callable")
continue
_policies.append(Policy(name, resolver))
continue
_policies.append(policy)
self._policies.extend(_policies)
self.with_middleware(AuthorizationMiddleware)
return self
async def _log_before_startup(self):
self._logger.info(f"Start API on {self._api_settings.host}:{self._api_settings.port}")
async def main(self):
self._logger.debug(f"Preparing API")
self._validate_policies()
if self._app is None:
routes = [route.to_starlette(inject) for route in self._routes.all()]
app = Starlette(
routes=routes,
middleware=[
*self._middleware,
Middleware(
CORSMiddleware,
allow_origins=self._get_allowed_origins(),
allow_methods=["*"],
allow_headers=["*"],
),
],
exception_handlers=self._exception_handlers,
)
else:
app = self._app
await self._log_before_startup()
config = uvicorn.Config(
app, host=self._api_settings.host, port=self._api_settings.port, log_config=None, loop="asyncio"
)
server = uvicorn.Server(config)
await server.serve()
self._logger.info("Shutdown API")

View File

@@ -1,46 +0,0 @@
from http.client import HTTPException
from starlette.responses import JSONResponse
from starlette.types import Scope, Receive, Send
class APIError(HTTPException):
status_code = 500
def __init__(self, message: str = ""):
HTTPException.__init__(self, self.status_code, message)
self._message = message
@property
def error_message(self) -> str:
if self._message:
return f"{type(self).__name__}: {self._message}"
return f"{type(self).__name__}"
async def asgi_response(self, scope: Scope, receive: Receive, send: Send):
r = JSONResponse({"error": self.error_message}, status_code=self.status_code)
return await r(scope, receive, send)
def response(self):
return JSONResponse({"error": self.error_message}, status_code=self.status_code)
class Unauthorized(APIError):
status_code = 401
class Forbidden(APIError):
status_code = 403
class NotFound(APIError):
status_code = 404
class AlreadyExists(APIError):
status_code = 409
class EndpointNotImplemented(APIError):
status_code = 501

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class APILogger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "api")

View File

@@ -1,4 +0,0 @@
from .authentication import AuthenticationMiddleware
from .authorization import AuthorizationMiddleware
from .logging import LoggingMiddleware
from .request import RequestMiddleware

View File

@@ -1,93 +0,0 @@
from keycloak import KeycloakAuthenticationError
from starlette.types import Scope, Receive, Send
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.error import Unauthorized
from cpl.api.logger import APILogger
from cpl.api.middleware.request import get_request
from cpl.api.router import Router
from cpl.auth.keycloak import KeycloakClient
from cpl.auth.schema import UserDao, User
from cpl.core.ctx import set_user
class AuthenticationMiddleware(ASGIMiddleware):
def __init__(self, app, logger: APILogger, keycloak: KeycloakClient, user_dao: UserDao):
ASGIMiddleware.__init__(self, app)
self._logger = logger
self._keycloak = keycloak
self._user_dao = user_dao
async def __call__(self, scope: Scope, receive: Receive, send: Send):
request = get_request()
url = request.url.path
if url not in Router.get_auth_required_routes():
self._logger.trace(f"No authentication required for {url}")
return await self._app(scope, receive, send)
user = getattr(request.state, "user", None)
if not user or user.deleted:
self._logger.debug(f"Unauthorized access to {url}, user missing or deleted")
return await Unauthorized("Unauthorized").asgi_response(scope, receive, send)
return await self._call_next(scope, receive, send)
async def _old_call__(self, scope: Scope, receive: Receive, send: Send):
request = get_request()
url = request.url.path
if url not in Router.get_auth_required_routes():
self._logger.trace(f"No authentication required for {url}")
return await self._app(scope, receive, send)
if not request.headers.get("Authorization"):
self._logger.debug(f"Unauthorized access to {url}, missing Authorization header")
return await Unauthorized(f"Missing header Authorization").asgi_response(scope, receive, send)
auth_header = request.headers.get("Authorization", None)
if not auth_header or not auth_header.startswith("Bearer "):
return await Unauthorized("Invalid Authorization header").asgi_response(scope, receive, send)
token = auth_header.split("Bearer ")[1]
if not await self._verify_login(token):
self._logger.debug(f"Unauthorized access to {url}, invalid token")
return await Unauthorized("Invalid token").asgi_response(scope, receive, send)
# check user exists in db, if not create
keycloak_id = self._keycloak.get_user_id(token)
if keycloak_id is None:
return await Unauthorized("Failed to get user id from token").asgi_response(scope, receive, send)
user = await self._get_or_crate_user(keycloak_id)
if user.deleted:
self._logger.debug(f"Unauthorized access to {url}, user is deleted")
return await Unauthorized("User is deleted").asgi_response(scope, receive, send)
request.state.user = user
set_user(user)
return await self._call_next(scope, receive, send)
async def _get_or_crate_user(self, keycloak_id: str) -> User:
existing = await self._user_dao.find_by_keycloak_id(keycloak_id)
if existing is not None:
return existing
user = User(0, keycloak_id)
uid = await self._user_dao.create(user)
return await self._user_dao.get_by_id(uid)
async def _verify_login(self, token: str) -> bool:
try:
token_info = self._keycloak.introspect(token)
return token_info.get("active", False)
except KeycloakAuthenticationError as e:
self._logger.debug(f"Keycloak authentication error: {e}")
return False
except Exception as e:
self._logger.error(f"Unexpected error during token verification: {e}")
return False

View File

@@ -1,71 +0,0 @@
from starlette.types import Scope, Receive, Send
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.error import Unauthorized, Forbidden
from cpl.api.logger import APILogger
from cpl.api.middleware.request import get_request
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.registry.policy import PolicyRegistry
from cpl.api.router import Router
from cpl.auth.schema._administration.user_dao import UserDao
from cpl.core.ctx.user_context import get_user
class AuthorizationMiddleware(ASGIMiddleware):
def __init__(self, app, logger: APILogger, policies: PolicyRegistry, user_dao: UserDao):
ASGIMiddleware.__init__(self, app)
self._logger = logger
self._policies = policies
self._user_dao = user_dao
async def __call__(self, scope: Scope, receive: Receive, send: Send):
request = get_request()
url = request.url.path
if url not in Router.get_authorization_rules_paths():
self._logger.trace(f"No authorization required for {url}")
return await self._app(scope, receive, send)
user = get_user()
if not user:
return await Unauthorized(f"Unknown user").asgi_response(scope, receive, send)
roles = await user.roles
request.state.roles = roles
role_names = [r.name for r in roles]
perms = await user.permissions
request.state.permissions = perms
perm_names = [p.name for p in perms]
for rule in Router.get_authorization_rules():
match = rule["match"]
if rule["roles"]:
if match == ValidationMatch.all and not all(r in role_names for r in rule["roles"]):
return await Forbidden(f"missing roles: {rule["roles"]}").asgi_response(scope, receive, send)
if match == ValidationMatch.any and not any(r in role_names for r in rule["roles"]):
return await Forbidden(f"missing roles: {rule["roles"]}").asgi_response(scope, receive, send)
if rule["permissions"]:
if match == ValidationMatch.all and not all(p in perm_names for p in rule["permissions"]):
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(
scope, receive, send
)
if match == ValidationMatch.any and not any(p in perm_names for p in rule["permissions"]):
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(
scope, receive, send
)
for policy_name in rule["policies"]:
policy = self._policies.get(policy_name)
if not policy:
self._logger.warning(f"Authorization policy '{policy_name}' not found")
continue
if not await policy.resolve(user):
return await Forbidden(f"policy {policy.name} failed").asgi_response(scope, receive, send)
return await self._call_next(scope, receive, send)

View File

@@ -1,85 +0,0 @@
import time
from starlette.requests import Request
from starlette.types import Receive, Scope, Send
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.logger import APILogger
from cpl.api.middleware.request import get_request
class LoggingMiddleware(ASGIMiddleware):
def __init__(self, app, logger: APILogger):
ASGIMiddleware.__init__(self, app)
self._logger = logger
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] != "http":
await self._call_next(scope, receive, send)
return
request = get_request()
await self._log_request(request)
start_time = time.time()
response_body = b""
status_code = 500
async def send_wrapper(message):
nonlocal response_body, status_code
if message["type"] == "http.response.start":
status_code = message["status"]
if message["type"] == "http.response.body":
response_body += message.get("body", b"")
await send(message)
await self._call_next(scope, receive, send_wrapper)
duration = (time.time() - start_time) * 1000
await self._log_after_request(request, status_code, duration)
@staticmethod
def _filter_relevant_headers(headers: dict) -> dict:
relevant_keys = {
"content-type",
"host",
"connection",
"user-agent",
"origin",
"referer",
"accept",
}
return {key: value for key, value in headers.items() if key in relevant_keys}
async def _log_request(self, request: Request):
self._logger.debug(
f"Request {getattr(request.state, 'request_id', '-')}: {request.method}@{request.url.path} from {request.client.host}"
)
from cpl.core.ctx.user_context import get_user
user = get_user()
request_info = {
"headers": self._filter_relevant_headers(dict(request.headers)),
"args": dict(request.query_params),
"form-data": (
await request.form()
if request.headers.get("content-type") == "application/x-www-form-urlencoded"
else None
),
"payload": (await request.json() if request.headers.get("content-length") == "0" else None),
"user": f"{user.id}-{user.keycloak_id}" if user else None,
"files": (
{key: file.filename for key, file in (await request.form()).items()} if await request.form() else None
),
}
self._logger.trace(f"Request {getattr(request.state, 'request_id', '-')}: {request_info}")
async def _log_after_request(self, request: Request, status_code: int, duration: float):
self._logger.info(
f"Request finished {getattr(request.state, 'request_id', '-')}: {status_code}-{request.method}@{request.url.path} from {request.client.host} in {duration:.2f}ms"
)

View File

@@ -1,98 +0,0 @@
import time
from contextvars import ContextVar
from typing import Optional, Union
from uuid import uuid4
from starlette.requests import Request
from starlette.types import Scope, Receive, Send
from starlette.websockets import WebSocket
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.logger import APILogger
from cpl.api.typing import TRequest
from cpl.auth.keycloak.keycloak_client import KeycloakClient
from cpl.auth.schema import User
from cpl.auth.schema._administration.user_dao import UserDao
from cpl.core.ctx import set_user
from cpl.dependency.inject import inject
from cpl.dependency.service_provider import ServiceProvider
_request_context: ContextVar[Union[TRequest, None]] = ContextVar("request", default=None)
class RequestMiddleware(ASGIMiddleware):
def __init__(self, app, provider: ServiceProvider, logger: APILogger, keycloak: KeycloakClient, user_dao: UserDao):
ASGIMiddleware.__init__(self, app)
self._provider = provider
self._logger = logger
self._keycloak = keycloak
self._user_dao = user_dao
self._ctx_token = None
async def __call__(self, scope: Scope, receive: Receive, send: Send):
request = Request(scope, receive, send) if scope["type"] != "websocket" else WebSocket(scope, receive, send)
await self.set_request_data(request)
try:
await self._try_set_user(request)
with self._provider.create_scope():
inject(await self._app(scope, receive, send))
finally:
await self.clean_request_data()
async def set_request_data(self, request: TRequest):
request.state.request_id = uuid4()
request.state.start_time = time.time()
self._logger.trace(f"Set new current request: {request.state.request_id}")
self._ctx_token = _request_context.set(request)
async def clean_request_data(self):
request = get_request()
if request is None:
return
if self._ctx_token is None:
return
self._logger.trace(f"Clearing current request: {request.state.request_id}")
_request_context.reset(self._ctx_token)
async def _try_set_user(self, request: Request):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return
token = auth_header.split("Bearer ")[1]
try:
token_info = self._keycloak.introspect(token)
if not token_info.get("active", False):
return
keycloak_id = self._keycloak.get_user_id(token)
if not keycloak_id:
return
user = await self._user_dao.find_by_keycloak_id(keycloak_id)
if not user:
user = User(0, keycloak_id)
uid = await self._user_dao.create(user)
user = await self._user_dao.get_by_id(uid)
if user.deleted:
return
request.state.user = user
set_user(user)
self._logger.trace(f"User {user.id} bound to request {request.state.request_id}")
except Exception as e:
self._logger.debug(f"Silent user binding failed: {e}")
def get_request() -> Optional[TRequest]:
return _request_context.get()

View File

@@ -1,3 +0,0 @@
from .api_route import ApiRoute
from .policy import Policy
from .validation_match import ValidationMatch

View File

@@ -1,43 +0,0 @@
from typing import Callable
from starlette.routing import Route
from cpl.api.typing import HTTPMethods
class ApiRoute:
def __init__(self, path: str, fn: Callable, method: HTTPMethods, **kwargs):
self._path = path
self._fn = fn
self._method = method
self._kwargs = kwargs
@property
def name(self) -> str:
return self._fn.__name__
@property
def fn(self) -> Callable:
return self._fn
@property
def path(self) -> str:
return self._path
@property
def method(self) -> HTTPMethods:
return self._method
@property
def kwargs(self) -> dict:
return self._kwargs
def to_starlette(self, wrap_endpoint: Callable = None) -> Route:
return Route(
self._path,
self._fn if not wrap_endpoint else wrap_endpoint(self._fn),
methods=[self._method],
**self._kwargs,
)

View File

@@ -1,34 +0,0 @@
from asyncio import iscoroutinefunction
from typing import Optional
from cpl.api.typing import PolicyResolver
from cpl.core.ctx import get_user
class Policy:
def __init__(
self,
name: str,
resolver: PolicyResolver = None,
):
self._name = name
self._resolver: Optional[PolicyResolver] = resolver
@property
def name(self) -> str:
return self._name
@property
def resolvers(self) -> PolicyResolver:
return self._resolver
async def resolve(self, *args, **kwargs) -> bool:
if not self._resolver:
return True
if callable(self._resolver):
if iscoroutinefunction(self._resolver):
return await self._resolver(get_user())
return self._resolver(get_user())
return False

View File

@@ -1,6 +0,0 @@
from enum import Enum
class ValidationMatch(Enum):
any = "any"
all = "all"

View File

@@ -1,31 +0,0 @@
from typing import Callable
import starlette.routing
class WebSocketRoute:
def __init__(self, path: str, fn: Callable, **kwargs):
self._path = path
self._fn = fn
self._kwargs = kwargs
@property
def name(self) -> str:
return self._fn.__name__
@property
def fn(self) -> Callable:
return self._fn
@property
def path(self) -> str:
return self._path
@property
def kwargs(self) -> dict:
return self._kwargs
def to_starlette(self, *args) -> starlette.routing.WebSocketRoute:
return starlette.routing.WebSocketRoute(self._path, self._fn)

View File

@@ -1,2 +0,0 @@
from .policy import PolicyRegistry
from .route import RouteRegistry

View File

@@ -1,28 +0,0 @@
from typing import Optional
from cpl.api.model.policy import Policy
from cpl.core.abc.registry_abc import RegistryABC
class PolicyRegistry(RegistryABC):
def __init__(self):
RegistryABC.__init__(self)
def extend(self, items: list[Policy]):
for policy in items:
self.add(policy)
def add(self, item: Policy):
assert isinstance(item, Policy), "policy must be an instance of Policy"
if item.name in self._items:
raise ValueError(f"Policy {item.name} is already registered")
self._items[item.name] = item
def get(self, key: str) -> Optional[Policy]:
return self._items.get(key)
def all(self) -> list[Policy]:
return list(self._items.values())

View File

@@ -1,35 +0,0 @@
from typing import Optional, Union
from cpl.api.model.api_route import ApiRoute
from cpl.api.model.websocket_route import WebSocketRoute
from cpl.core.abc.registry_abc import RegistryABC
TRoute = Union[ApiRoute, WebSocketRoute]
class RouteRegistry(RegistryABC):
def __init__(self):
RegistryABC.__init__(self)
def extend(self, items: list[TRoute]):
for policy in items:
self.add(policy)
def add(self, item: TRoute):
assert isinstance(item, (ApiRoute, WebSocketRoute)), "route must be an instance of ApiRoute"
if item.path in self._items:
raise ValueError(f"ApiRoute {item.path} is already registered")
self._items[item.path] = item
def set(self, item: TRoute):
assert isinstance(item, ApiRoute), "route must be an instance of ApiRoute"
self._items[item.path] = item
def get(self, key: str) -> Optional[TRoute]:
return self._items.get(key)
def all(self) -> list[TRoute]:
return list(self._items.values())

View File

@@ -1,178 +0,0 @@
from enum import Enum
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.registry.route import RouteRegistry
from cpl.api.typing import HTTPMethods
from cpl.dependency import get_provider
class Router:
_auth_required: list[str] = []
_authorization_rules: dict[str, dict] = {}
@classmethod
def get_auth_required_routes(cls) -> list[str]:
return cls._auth_required
@classmethod
def get_authorization_rules_paths(cls) -> list[str]:
return list(cls._authorization_rules.keys())
@classmethod
def get_authorization_rules(cls) -> list[dict]:
return list(cls._authorization_rules.values())
@classmethod
def authenticate(cls):
"""
Decorator to mark a route as requiring authentication.
Usage:
@Route.authenticate()
@Route.get("/example")
async def example_endpoint(request: TRequest):
...
"""
def inner(fn):
route_path = getattr(fn, "_route_path", None)
if route_path and route_path not in cls._auth_required:
cls._auth_required.append(route_path)
return fn
return inner
@classmethod
def authorize(
cls,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
):
"""
Decorator to mark a route as requiring authorization.
Usage:
@Route.authorize()
@Route.get("/example")
async def example_endpoint(request: TRequest):
...
"""
assert roles is None or isinstance(roles, list), "roles must be a list of strings"
assert permissions is None or isinstance(permissions, list), "permissions must be a list of strings"
assert policies is None or isinstance(policies, list), "policies must be a list of strings"
assert match is None or isinstance(match, ValidationMatch), "match must be an instance of ValidationMatch"
if roles is not None:
for role in roles:
if isinstance(role, Enum):
roles[roles.index(role)] = role.value
if permissions is not None:
for perm in permissions:
if isinstance(perm, Enum):
permissions[permissions.index(perm)] = perm.value
def inner(fn):
path = getattr(fn, "_route_path", None)
if not path:
return fn
if path in cls._authorization_rules:
raise ValueError(f"Route {path} is already registered for authorization")
cls._authorization_rules[path] = {
"roles": roles or [],
"permissions": permissions or [],
"policies": policies or [],
"match": match or ValidationMatch.all,
}
return fn
return inner
@classmethod
def websocket(cls, path: str, registry: RouteRegistry = None, **kwargs):
from cpl.api.model.websocket_route import WebSocketRoute
if not registry:
routes = get_provider().get_service(RouteRegistry)
else:
routes = registry
def inner(fn):
routes.add(WebSocketRoute(path, fn, **kwargs))
setattr(fn, "_route_path", path)
return fn
return inner
@classmethod
def route(cls, path: str, method: HTTPMethods, registry: RouteRegistry = None, **kwargs):
from cpl.api.model.api_route import ApiRoute
if not registry:
routes = get_provider().get_service(RouteRegistry)
else:
routes = registry
def inner(fn):
routes.add(ApiRoute(path, fn, method, **kwargs))
setattr(fn, "_route_path", path)
return fn
return inner
@classmethod
def get(cls, path: str, **kwargs):
return cls.route(path, "GET", **kwargs)
@classmethod
def head(cls, path: str, **kwargs):
return cls.route(path, "HEAD", **kwargs)
@classmethod
def post(cls, path: str, **kwargs):
return cls.route(path, "POST", **kwargs)
@classmethod
def put(cls, path: str, **kwargs):
return cls.route(path, "PUT", **kwargs)
@classmethod
def patch(cls, path: str, **kwargs):
return cls.route(path, "PATCH", **kwargs)
@classmethod
def delete(cls, path: str, **kwargs):
return cls.route(path, "DELETE", **kwargs)
@classmethod
def override(cls):
"""
Decorator to override an existing route with the same path.
Usage:
@Route.override()
@Route.get("/example")
async def example_endpoint(request: TRequest):
...
"""
from cpl.api.model.api_route import ApiRoute
routes = get_provider().get_service(RouteRegistry)
def inner(fn):
path = getattr(fn, "_route_path", None)
if path is None:
raise ValueError("Cannot override a route that has not been registered yet")
route = routes.get(path)
if route is None:
raise ValueError(f"Cannot override a route that does not exist: {path}")
routes.add(ApiRoute(path, fn, route.method, **route.kwargs))
setattr(fn, "_route_path", path)
return fn
return inner

View File

@@ -1,22 +0,0 @@
from typing import Union, Literal, Callable, Type, Awaitable
from urllib.request import Request
from starlette.middleware import Middleware
from starlette.responses import Response
from starlette.types import ASGIApp
from starlette.websockets import WebSocket
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.auth.schema import User
TRequest = Union[Request, WebSocket]
TEndpoint = Callable[[TRequest, ...], Awaitable[Response]] | Callable[[TRequest, ...], Response]
HTTPMethods = Literal["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"]
PartialMiddleware = Union[
ASGIMiddleware,
Type[ASGIMiddleware],
Middleware,
Callable[[ASGIApp], ASGIApp],
]
PolicyResolver = Callable[[User], bool | Awaitable[bool]]
PolicyInput = Union[dict[str, PolicyResolver], "Policy"]

View File

@@ -1,122 +0,0 @@
from abc import ABC, abstractmethod
from typing import Callable, Self
from cpl.application.host import Host
from cpl.core.errors import module_dependency_error
from cpl.core.log.log_level import LogLevel
from cpl.core.log.log_settings import LogSettings
from cpl.core.log.logger_abc import LoggerABC
from cpl.dependency.service_provider import ServiceProvider
from cpl.dependency.typing import TModule
def __not_implemented__(package: str, func: Callable):
raise NotImplementedError(f"Package {package} is required to use {func.__name__} method")
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
services: :class:`cpl.dependency.service_provider.ServiceProvider`
Contains instances of prepared objects
"""
@classmethod
def extend(cls, name: str | Callable, func: Callable[[Self], Self]):
r"""Extend the Application with a custom method
Parameters:
name: :class:`str`
Name of the method
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
if callable(name):
name = name.__name__
setattr(cls, name, func)
return cls
@abstractmethod
def __init__(
self, services: ServiceProvider, loaded_modules: set[TModule], required_modules: list[str | object] = None
):
self._services = services
self._modules = loaded_modules
self._required_modules = (
[x.__name__ if not isinstance(x, str) else x for x in required_modules] if required_modules else []
)
def validate_app_required_modules(self):
modules_names = {x.__name__ for x in self._modules}
for module in self._required_modules:
if module in modules_names:
continue
module_dependency_error(
type(self).__name__,
module.__name__ if not isinstance(module, str) else module,
ImportError(
f"Required module '{module}' for application '{self.__class__.__name__}' is not loaded. Load using 'add_module({module})' method."
),
)
def with_logging(self, level: LogLevel = None):
if level is None:
from cpl.core.configuration.configuration import Configuration
settings = Configuration.get(LogSettings)
level = settings.level if settings else LogLevel.info
logger = self._services.get_service(LoggerABC)
logger.set_level(level)
def with_permissions(self, *args):
try:
from cpl.auth import AuthModule
AuthModule.with_permissions(*args)
except ImportError:
__not_implemented__("cpl-auth", self.with_permissions)
def with_migrations(self, *args):
try:
from cpl.database.database_module import DatabaseModule
DatabaseModule.with_migrations(self._services, *args)
except ImportError:
__not_implemented__("cpl-database", self.with_migrations)
def with_extension(self, func: Callable[[Self, ...], None], *args, **kwargs):
r"""Extend the Application with a custom method
Parameters:
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
assert func is not None, "func must not be None"
assert callable(func), "func must be callable"
func(self, *args, **kwargs)
def run(self):
r"""Entry point
Called by custom Application.main
"""
try:
for module in self._modules:
if not hasattr(module, "configure") and not callable(getattr(module, "configure")):
continue
module.configure(self._services)
Host.run_app(self.main)
except KeyboardInterrupt:
pass
finally:
logger = self._services.get_service(LoggerABC)
logger.info("Application shutdown")
@abstractmethod
def main(self): ...

View File

@@ -1,99 +0,0 @@
import asyncio
from typing import Callable
from cpl.core.property import classproperty
from cpl.dependency.context import get_provider, use_root_provider
from cpl.dependency.service_collection import ServiceCollection
from cpl.dependency.hosted.startup_task import StartupTask
class Host:
_loop: asyncio.AbstractEventLoop | None = None
_tasks: dict = {}
_service_collection: ServiceCollection | None = None
@classproperty
def services(cls) -> ServiceCollection:
if cls._service_collection is None:
cls._service_collection = ServiceCollection()
return cls._service_collection
@classmethod
def get_provider(cls):
provider = get_provider()
if provider is None:
provider = cls.services.build()
use_root_provider(provider)
return provider
@classmethod
def get_loop(cls) -> asyncio.AbstractEventLoop:
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
return cls._loop
@classmethod
def run_start_tasks(cls):
provider = cls.get_provider()
tasks = provider.get_services(StartupTask)
loop = cls.get_loop()
for task in tasks:
if asyncio.iscoroutinefunction(task.run):
loop.run_until_complete(task.run())
else:
task.run()
@classmethod
def run_hosted_services(cls):
provider = cls.get_provider()
services = provider.get_hosted_services()
loop = cls.get_loop()
for service in services:
if asyncio.iscoroutinefunction(service.start):
cls._tasks[service] = loop.create_task(service.start())
@classmethod
async def _stop_all(cls):
for service in cls._tasks.keys():
if asyncio.iscoroutinefunction(service.stop):
await service.stop()
for task in cls._tasks.values():
task.cancel()
cls._tasks.clear()
@classmethod
async def wait_for_all(cls):
await asyncio.gather(*cls._tasks.values())
@classmethod
def run_app(cls, func: Callable, *args, **kwargs):
cls.run_start_tasks()
cls.run_hosted_services()
async def runner():
try:
if asyncio.iscoroutinefunction(func):
await func(*args, **kwargs)
else:
func(*args, **kwargs)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
await cls._stop_all()
cls.get_loop().run_until_complete(runner())
@classmethod
def run(cls, func: Callable, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return cls.get_loop().run_until_complete(func(*args, **kwargs))
return func(*args, **kwargs)

View File

@@ -1,8 +0,0 @@
from cpl.auth import permission as _permission
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin as _KeycloakAdmin
from cpl.auth.keycloak.keycloak_client import KeycloakClient as _KeycloakClient
from .auth_module import AuthModule
from .keycloak_settings import KeycloakSettings
from .logger import AuthLogger
__version__ = "1.0.0"

View File

@@ -1,56 +0,0 @@
import os
from enum import Enum
from typing import Type
from cpl.auth.keycloak_settings import KeycloakSettings
from cpl.database.database_module import DatabaseModule
from cpl.database.model.server_type import ServerType, ServerTypes
from cpl.database.mysql.mysql_module import MySQLModule
from cpl.database.postgres.postgres_module import PostgresModule
from cpl.dependency.module.module import Module
from cpl.dependency.service_provider import ServiceProvider
from .keycloak.keycloak_admin import KeycloakAdmin
from .keycloak.keycloak_client import KeycloakClient
from .schema._administration.api_key_dao import ApiKeyDao
from .schema._administration.user_dao import UserDao
from .schema._permission.api_key_permission_dao import ApiKeyPermissionDao
from .schema._permission.permission_dao import PermissionDao
from .schema._permission.role_dao import RoleDao
from .schema._permission.role_permission_dao import RolePermissionDao
from .schema._permission.role_user_dao import RoleUserDao
class AuthModule(Module):
dependencies = [DatabaseModule, (MySQLModule, PostgresModule)]
config = [KeycloakSettings]
singleton = [
KeycloakClient,
KeycloakAdmin,
UserDao,
ApiKeyDao,
ApiKeyPermissionDao,
PermissionDao,
RoleDao,
RolePermissionDao,
RoleUserDao,
]
scoped = []
transient = []
@staticmethod
def configure(provider: ServiceProvider):
paths = {
ServerTypes.POSTGRES: "scripts/postgres",
ServerTypes.MYSQL: "scripts/mysql",
}
DatabaseModule.with_migrations(
provider, str(os.path.join(os.path.dirname(os.path.realpath(__file__)), paths[ServerType.server_type]))
)
@staticmethod
def with_permissions(*permissions: Type[Enum]):
from cpl.auth.permission.permissions_registry import PermissionsRegistry
for perm in permissions:
PermissionsRegistry.with_enum(perm)

View File

@@ -1,23 +0,0 @@
from typing import Optional
from keycloak import KeycloakOpenID
from cpl.auth.logger import AuthLogger
from cpl.auth.keycloak_settings import KeycloakSettings
class KeycloakClient(KeycloakOpenID):
def __init__(self, logger: AuthLogger, settings: KeycloakSettings):
KeycloakOpenID.__init__(
self,
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
logger.info("Initializing Keycloak client")
def get_user_id(self, token: str) -> Optional[str]:
info = self.introspect(token)
return info.get("sub", None)

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class AuthLogger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "auth")

View File

@@ -1,4 +0,0 @@
from .permission_module import PermissionsModule
from .permission_seeder import PermissionSeeder
from .permissions import Permissions
from .permissions_registry import PermissionsRegistry

View File

@@ -1,18 +0,0 @@
from cpl.auth.auth_module import AuthModule
from cpl.auth.permission.permission_seeder import PermissionSeeder
from cpl.auth.permission.permissions import Permissions
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.auth.permission.role_seeder import RoleSeeder
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.database_module import DatabaseModule
from cpl.dependency.module.module import Module
from cpl.dependency.service_collection import ServiceCollection
class PermissionsModule(Module):
dependencies = [DatabaseModule, AuthModule]
transient = [(DataSeederABC, PermissionSeeder), (DataSeederABC, RoleSeeder)]
@staticmethod
def register(collection: ServiceCollection):
PermissionsRegistry.with_enum(Permissions)

View File

@@ -1,60 +0,0 @@
from cpl.auth.schema import (
Role,
RolePermission,
PermissionDao,
RoleDao,
RolePermissionDao,
ApiKeyDao,
ApiKeyPermissionDao,
UserDao,
RoleUserDao,
RoleUser,
)
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.logger import DBLogger
class RoleSeeder(DataSeederABC):
def __init__(
self,
logger: DBLogger,
permission_dao: PermissionDao,
role_dao: RoleDao,
role_permission_dao: RolePermissionDao,
api_key_dao: ApiKeyDao,
api_key_permission_dao: ApiKeyPermissionDao,
user_dao: UserDao,
role_user_dao: RoleUserDao,
):
DataSeederABC.__init__(self)
self._logger = logger
self._permission_dao = permission_dao
self._role_dao = role_dao
self._role_permission_dao = role_permission_dao
self._api_key_dao = api_key_dao
self._api_key_permission_dao = api_key_permission_dao
self._user_dao = user_dao
self._role_user_dao = role_user_dao
async def seed(self):
self._logger.info("Creating admin role")
roles = await self._role_dao.get_all()
if len(roles) == 0:
rid = await self._role_dao.create(Role(0, "admin", "Default admin role"))
permissions = await self._permission_dao.get_all()
await self._role_permission_dao.create_many(
[RolePermission(0, rid, permission.id) for permission in permissions]
)
role = await self._role_dao.get_by_name("admin")
if len(await role.users) > 0:
return
users = await self._user_dao.get_all()
if len(users) == 0:
return
user = users[0]
self._logger.warning(f"Assigning admin role to first user {user.id}")
await self._role_user_dao.create(RoleUser(0, role.id, user.id))

View File

@@ -1,89 +0,0 @@
import uuid
from datetime import datetime
from typing import Optional, Self
from async_property import async_property
from keycloak import KeycloakGetError
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.database.logger import DBLogger
from cpl.dependency import get_provider
class User(DbModelABC[Self]):
def __init__(
self,
id: SerialId,
keycloak_id: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._keycloak_id = keycloak_id
@property
def keycloak_id(self) -> str:
return self._keycloak_id
@property
def username(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak = get_provider().get_service(KeycloakAdmin)
return keycloak.get_user(self._keycloak_id).get("username")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
logger = get_provider().get_service(DBLogger)
logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@property
def email(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak = get_provider().get_service(KeycloakAdmin)
return keycloak.get_user(self._keycloak_id).get("email")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
logger = get_provider().get_service(DBLogger)
logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@async_property
async def roles(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = get_provider().get_service(RoleUserDao)
return [await x.role for x in await role_user_dao.get_by_user_id(self.id)]
@async_property
async def permissions(self):
from cpl.auth.schema._administration.user_dao import UserDao
user_dao: UserDao = get_provider().get_service(UserDao)
return await user_dao.get_permissions(self.id)
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._administration.user_dao import UserDao
user_dao: UserDao = get_provider().get_service(UserDao)
return await user_dao.has_permission(self.id, permission)
async def anonymize(self):
from cpl.auth.schema._administration.user_dao import UserDao
user_dao: UserDao = get_provider().get_service(UserDao)
self._keycloak_id = str(uuid.UUID(int=0))
await user_dao.update(self)

View File

@@ -1,44 +0,0 @@
from datetime import datetime
from typing import Self
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import get_provider
class RolePermission(DbJoinModelABC[Self]):
def __init__(
self,
id: SerialId,
role_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbJoinModelABC.__init__(self, id, role_id, permission_id, deleted, editor_id, created, updated)
@property
def role_id(self) -> int:
return self._source_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = get_provider().get_service(RoleDao)
return await role_dao.get_by_id(self._source_id)
@property
def permission_id(self) -> int:
return self._foreign_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = get_provider().get_service(PermissionDao)
return await permission_dao.get_by_id(self._foreign_id)

View File

@@ -1,29 +0,0 @@
{
"name": "cpl-cli",
"version": "0.1.0",
"type": "console",
"license": "MIT",
"author": "Sven Heidemann",
"description": "CLI for the CPL library",
"homepage": "",
"keywords": [],
"dependencies": {
"click": "~8.3.0"
},
"devDependencies": {
"black": "~25.9"
},
"references": [],
"main": "cpl/cli/main.py",
"directory": "cpl/cli",
"build": {
"include": [
"_templates/"
],
"exclude": [
"**/__pycache__",
"**/logs",
"**/tests"
]
}
}

View File

@@ -1,9 +0,0 @@
from abc import ABC
class <Name>ABC(ABC):
def __init__(self):
ABC.__init__(self)
print("<schematic> <multi_camelName> initialized")

View File

@@ -1,16 +0,0 @@
from cpl.application.abc import ApplicationABC
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.dependency import ServiceProvider
from cpl.dependency.typing import Modules
class <Name>(ApplicationABC):
def __init__(self, services: ServiceProvider, modules: Modules):
ApplicationABC.__init__(self, services, modules)
self._logger = services.get_service(LoggerABC)
async def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")

View File

@@ -1,10 +0,0 @@
from cpl.core.configuration import ConfigurationModelABC
class <Name>Config(ConfigurationModelABC):
def __init__(
self,
src: dict = None,
):
ConfigurationModelABC.__init__(self, src)

View File

@@ -1,9 +0,0 @@
from cpl.core.console import Console
from cpl.core.service import CronjobABC
class CronJob(CronjobABC):
def __init__(self):
CronjobABC.__init__(self, Cron("*/1 * * * *"))
async def loop(self):
Console.write_line(f"[{datetime.now()}] Hello, World!")

View File

@@ -1,9 +0,0 @@
from cpl.database.abc import DbModelDaoABC
class <Name>Dao(DbModelDaoABC[<Name>]):
def __init__(self):
DbModelDaoABC.__init__(self, <Name>, "<multi_name>")
self.attribute(<Name>.name, str)

View File

@@ -1,23 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class <Name>(DbModelABC[Self]):
def __init__(
self,
id: SerialId,
name: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
@property
def name(self) -> str:
return self._name

View File

@@ -1,29 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
class <Name>Join(DbJoinModelABC[Self]):
def __init__(
self,
id: SerialId,
source_id: SerialId,
reference_id: SerialId,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbJoinModelABC.__init__(self, source_id, reference_id, id, deleted, editor_id, created, updated)
self._source_id = source_id
self._reference_id = reference_id
@property
def source_id(self) -> int:
return self._source_id
@property
def reference(self) -> int:
return self._reference_id

View File

@@ -1,5 +0,0 @@
from enum import Enum
class <Name>Enum(Enum):
KEY = "value"

View File

@@ -1,13 +0,0 @@
from cpl.core.console import Console
from cpl.core.service import HostedService
class <Name>(HostedService):
def __init__(self):
HostedService.__init__(self)
async def start(self):
Console.write_line("Hello, World!")
async def stop(self):
Console.write_line("Goodbye, World!")

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class <Name>Logger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "<name>")

View File

@@ -1,17 +0,0 @@
from cpl.dependency import ServiceCollection, ServiceProvider
from cpl.dependency.module import Module
class <Name>Module(Module):
dependencies = []
configuration = []
singleton = []
scoped = []
transient = []
hosted = []
@staticmethod
def register(collection: ServiceCollection): ...
@staticmethod
def configure(provider: ServiceProvider): ...

View File

@@ -1,9 +0,0 @@
import multiprocessing
class <Name>(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
def run(self): ...

View File

@@ -1,11 +0,0 @@
from cpl.core.pipes import PipeABC
from cpl.core.typing import T
class <Name>Pipe(PipeABC):
@staticmethod
def to_str(value: T, *args) -> str: ...
@staticmethod
def from_str(value: str, *args) -> T: ...

View File

@@ -1,9 +0,0 @@
import threading
class <Name>(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self): ...

View File

@@ -1,18 +0,0 @@
from cpl.api.application import WebApp
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.dependency import ServiceProvider
from cpl.dependency.typing import Modules
class <Name>(WebApp):
def __init__(self, services: ServiceProvider, modules: Modules):
WebApp.__init__(self, services, modules)
self._logger = services.get_service(LoggerABC)
async def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")
await super().main()

View File

@@ -1,9 +0,0 @@
from cpl.core.console import Console
def main():
Console.write_line("Hello, World!")
if __name__ == "__main__":
main()

View File

@@ -1,46 +0,0 @@
from cpl.api import ApiModule
from cpl.application import ApplicationBuilder
from cpl.core.configuration import Configuration
from cpl.graphql.application import GraphQLApp
from starlette.responses import JSONResponse
def main():
builder = ApplicationBuilder[GraphQLApp](GraphQLApp)
Configuration.add_json_file(f"appsettings.json", optional=True)
(
builder.services.add_logging()
# uncomment to add preferred database module
# .add_module(MySQLModule)
# .add_module(PostgresModule)
.add_module(ApiModule)
)
app = builder.build()
app.with_logging()
app.with_authentication()
app.with_authorization()
app.with_route(
path="/ping",
fn=lambda r: JSONResponse("pong"),
method="GET",
)
schema = app.with_graphql()
schema.query.string_field("ping", resolver=lambda: "pong")
app.with_auth_root_queries(True)
app.with_auth_root_mutations(True)
app.with_playground()
app.with_graphiql()
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,3 +0,0 @@
class Class1:
def __init__(self): ...

View File

@@ -1,13 +0,0 @@
from cpl.application import Host
from my_hosted_service import MyHostedService
async def main():
Host.services.add_hosted_service(MyHostedService)
Host.run_start_tasks()
Host.run_hosted_services()
await Host.wait_for_all()
if __name__ == "__main__":
Host.run(main)

View File

@@ -1,13 +0,0 @@
from cpl.core.console import Console
from cpl.dependency.hosted import HostedService
class MyHostedService(HostedService):
def __init__(self):
HostedService.__init__(self)
async def start(self):
Console.write_line("Hello, World!")
async def stop(self):
Console.write_line("Goodbye, World!")

View File

@@ -1,37 +0,0 @@
from starlette.responses import JSONResponse
from cpl.api import ApiModule
from cpl.api.application import WebApp
from cpl.application import ApplicationBuilder
from cpl.core.configuration import Configuration
def main():
builder = ApplicationBuilder[WebApp](WebApp)
Configuration.add_json_file(f"appsettings.json", optional=True)
(
builder.services.add_logging()
# uncomment to add preferred database module
# .add_module(MySQLModule)
# .add_module(PostgresModule)
.add_module(ApiModule)
)
app = builder.build()
app.with_logging()
app.with_authentication()
app.with_authorization()
app.with_route(
path="/ping",
fn=lambda r: JSONResponse("pong"),
method="GET",
)
app.run()
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
__version__ = "1.0.0"

View File

@@ -1,55 +0,0 @@
import traceback
import click
from cpl.core.console import Console
class AliasedGroup(click.Group):
def command(self, *args, **kwargs):
aliases = kwargs.pop("aliases", [])
def decorator(f):
cmd = super(AliasedGroup, self).command(*args, **kwargs)(f)
cmd.callback = self._handle_errors(cmd.callback)
for alias in aliases:
self.add_command(cmd, alias)
return cmd
return decorator
def format_commands(self, ctx, formatter):
commands = []
seen = set()
for name, cmd in self.commands.items():
if cmd in seen:
continue
seen.add(cmd)
aliases = [a for a, c in self.commands.items() if c is cmd and a != name]
alias_text = f" (aliases: {', '.join(aliases)})" if aliases else ""
commands.append((name, f"{cmd.short_help or ''}{alias_text}"))
with formatter.section("Commands"):
formatter.write_dl(commands)
@staticmethod
def _handle_errors(f):
def wrapper(*args, **kwargs):
try:
res = f(*args, **kwargs)
Console.write_line()
return res
except Exception as e:
tb = None
if "verbose" in kwargs and kwargs["verbose"]:
tb = traceback.format_exc()
Console.error(str(e), tb)
Console.write_line()
exit(-1)
return wrapper
@click.group(cls=AliasedGroup)
def cli(): ...

View File

@@ -1,27 +0,0 @@
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("add", aliases=["a"])
@click.argument("reference", type=click.STRING, required=True)
@click.argument("target", type=click.STRING, required=True)
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def add(reference: str, target: str, verbose: bool):
reference_project = Structure.get_project_by_name_or_path(reference)
target_project = Structure.get_project_by_name_or_path(target)
if reference_project.name == target_project.name:
raise ValueError("Cannot add a project as a dependency to itself!")
if reference_project.path in target_project.references:
raise ValueError(f"Project '{reference_project.name}' is already a reference of '{target_project.name}'")
rel_path = Path(reference_project.path).relative_to(Path(target_project.path).parent, walk_up=True)
target_project.references.append(str(rel_path))
target_project.save()
Console.write_line(f"Added '{reference_project.name}' to '{target_project.name}' project")

View File

@@ -1,69 +0,0 @@
import os
import subprocess
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.const import PIP_URL
from cpl.cli.utils.pip import Pip
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("install", aliases=["i"])
@click.argument("package", type=click.STRING, required=False)
@click.argument("project", type=click.STRING, required=False)
@click.option("--dev", is_flag=True, help="Include dev dependencies")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def install(package: str, project: str, dev: bool, verbose: bool):
project = Structure.get_project_by_name_or_path(project or "./")
if package is not None:
Console.write_line(f"Installing {package} to '{project.name}':")
try:
Pip.command(
f"install --extra-index-url {PIP_URL}",
package,
verbose=verbose,
path=Path(project.path).parent,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to install {package}: exit code {e.returncode}")
return
package_name = Pip.get_package_without_version(package)
installed_version = Pip.get_package_version(package_name, path=project.path)
if installed_version is None:
Console.error(f"Package '{package_name}' not found after installation.")
return
deps = project.dependencies if not dev else project.dev_dependencies
deps[package_name] = Pip.apply_prefix(installed_version, Pip.get_package_full_version(package))
project.save()
Console.write_line(f"Added {package_name}~{installed_version} to project dependencies.")
return
deps: dict = project.dependencies
if dev:
deps.update(project.dev_dependencies)
if not deps:
Console.error("No dependencies to install.")
return
Console.write_line(f"Installing dependencies for '{project.name}':")
for name, version in deps.items():
dep = Pip.normalize_dep(name, version)
Console.write_line(f" -> {dep}")
try:
Pip.command(
"install --extra-index-url https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/",
dep,
verbose=verbose,
path=project.path,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to install {dep}: exit code {e.returncode}")

View File

@@ -1,26 +0,0 @@
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("remove", aliases=["rm"])
@click.argument("reference", type=click.STRING, required=True)
@click.argument("target", type=click.STRING, required=True)
def remove(reference: str, target: str):
reference_project = Structure.get_project_by_name_or_path(reference)
target_project = Structure.get_project_by_name_or_path(target)
if reference_project.name == target_project.name:
raise ValueError("Cannot add a project as a dependency to itself!")
rel_path = str(Path(reference_project.path).relative_to(Path(target_project.path).parent, walk_up=True))
if rel_path not in target_project.references:
raise ValueError(f"Project '{reference_project.name}' isn't a reference of '{target_project.name}'")
target_project.references.remove(rel_path)
target_project.save()
Console.write_line(f"Removed '{reference_project.name}' from '{target_project.name}' project")

View File

@@ -1,41 +0,0 @@
import subprocess
import click
from cpl.cli.cli import cli
from cpl.cli.utils.pip import Pip
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("uninstall", aliases=["ui"])
@click.argument("package", required=False)
@click.argument("project", type=click.STRING, required=False)
@click.option("--dev", is_flag=True, help="Include dev dependencies")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def uninstall(package: str, project: str, dev: bool, verbose: bool):
if package is None:
package = Console.read("Package name to uninstall: ").strip()
project = Structure.get_project_by_name_or_path(project or "./")
deps = project.dependencies if not dev else project.dev_dependencies
try:
Pip.command(
"uninstall -y",
package,
verbose=verbose,
path=project.path,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to uninstall {package}: exit code {e.returncode}")
return
if package in deps:
del deps[package]
project.save()
Console.write_line(f"Removed {package} from project dependencies.")
return
Console.write_line(f"Package {package} was not found in project dependencies.")

View File

@@ -1,74 +0,0 @@
import subprocess
import click
from cpl.cli.cli import cli
from cpl.cli.const import PIP_URL
from cpl.cli.utils.pip import Pip
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("update", aliases=["u"])
@click.argument("package", type=click.STRING, required=False)
@click.argument("project", type=click.STRING, required=False)
@click.option("--dev", is_flag=True, help="Include dev dependencies")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def update(package: str, project: str, dev: bool, verbose: bool):
project = Structure.get_project_by_name_or_path(project or "./")
deps: dict = project.dependencies
if dev:
deps = project.dev_dependencies
if package is not None:
if package not in deps:
Console.error(f"Package '{package}' not installed.")
return
old_spec = deps[package]
Console.write_line(f"Updating {package} to '{project.name}':")
try:
Pip.command(
f"install --upgrade --extra-index-url {PIP_URL}" f"{Pip.normalize_dep(package, old_spec)}",
verbose=verbose,
path=project.path,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to install {package}: exit code {e.returncode}")
return
installed_version = Pip.get_package_version(package, path=project.path)
if installed_version is None:
Console.error(f"Package '{package}' not found after update.")
return
deps[package] = Pip.apply_prefix(installed_version, old_spec)
project.save()
Console.write_line(f"Updated {package} to {deps[package]}")
return
if not deps:
Console.error("No dependencies to install.")
return
Console.write_line(f"Updating dependencies for '{project.name}':")
for name, version in list(deps.items()):
dep = Pip.normalize_dep(name, version)
Console.write_line(f" -> {dep}")
try:
Pip.command("install --upgrade", dep, verbose=verbose, path=project.path)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to update {dep}: exit code {e.returncode}")
return
installed_version = Pip.get_package_version(name, path=project.path)
if installed_version is None:
Console.error(f"Package '{name}' not found after update.")
continue
deps[name] = Pip.apply_prefix(installed_version, version)
project.save()

View File

@@ -1,73 +0,0 @@
import os.path
import subprocess
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.venv import ensure_venv, get_venv_python
from cpl.core.configuration import Configuration
from cpl.core.console import Console
@cli.command("build", aliases=["b"])
@click.argument("project", type=click.STRING, required=False)
@click.option("--dist", "-d", type=str)
@click.option("--skip-py-build", "-spb", is_flag=True, help="Skip toml generation and python build")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def build(project: str, dist: str = None, skip_py_build: bool = None, verbose: bool = None):
from cpl.cli.utils.structure import Structure
project = Structure.get_project_by_name_or_path(project or "./")
venv = ensure_venv().absolute()
dist_path = dist or Path(project.path).parent / "dist"
if dist is None and Configuration.get("workspace") is not None:
dist_path = Path(Configuration.get("workspace").path).parent / "dist"
dist_path = Path(dist_path).resolve().absolute()
if verbose:
Console.write_line(f"Creating dist folder at {dist_path}...")
os.makedirs(dist_path, exist_ok=True)
project.do_build(dist_path, verbose)
if skip_py_build:
Console.write_line("\nDone!")
return
Structure.create_pyproject_toml(project, dist_path / project.name)
python = str(get_venv_python(venv))
result = Console.spinner(
"Building python package...",
lambda: subprocess.run(
[
python,
"-m",
"build",
"--outdir",
str(dist_path / project.name),
str(dist_path / project.name),
],
check=True,
stdin=subprocess.DEVNULL if not verbose else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
),
)
if result is None:
raise RuntimeError("Build process did not run")
if verbose and result.stdout is not None:
Console.write_line(result.stdout.decode())
if result.returncode != 0 and result.stderr is not None:
if result.stderr is not None:
Console.error(str(result.stderr.decode()))
raise RuntimeError(f"Build process failed with exit code {result.returncode}")
Console.write_line(" Done!")

View File

@@ -1,55 +0,0 @@
import os
import subprocess
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.structure import Structure
from cpl.cli.utils.venv import get_venv_python, ensure_venv
from cpl.core.configuration import Configuration
from cpl.core.console import Console
@cli.command("run", aliases=["r"])
@click.argument("project", type=str, required=False, default=None)
@click.argument("args", nargs=-1)
@click.option("--dev", "-d", is_flag=True, help="Use sources instead of build output")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def run(project: str, args: list[str], dev: bool, verbose: bool):
project_path = Path("./")
if project is not None:
project_path = (Path("./") / project).resolve().absolute()
project = Structure.get_project_by_name_or_path(str(project_path))
if project.main is None:
Console.error(f"Project {project.name} has no executable")
return
path = str(Path(project.path).parent.resolve().absolute())
executable = project.main
if not dev:
dist_path = Path(project.path).parent / "dist"
if Configuration.get("workspace") is not None:
dist_path = Path(Configuration.get("workspace").path).parent / "dist"
dist_path = Path(dist_path).resolve().absolute()
if verbose:
Console.write_line(f"Creating dist folder at {dist_path}...")
os.makedirs(dist_path, exist_ok=True)
project.do_build(dist_path, verbose)
path = dist_path / project.name
main = project.main.replace(project.directory, "").lstrip("/\\")
executable = path / main
python = str(get_venv_python(ensure_venv()).absolute())
Console.write_line(f"\nStarting project {project.name}...")
if verbose:
Console.write_line(f" with args {args}...")
Console.write_line("\n\n")
subprocess.run([python, executable, *args], cwd=path)

View File

@@ -1,31 +0,0 @@
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.live_server.live_server import LiveServer
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("start", aliases=["s"])
@click.argument("project", type=str, required=False, default=None)
@click.argument("args", nargs=-1)
@click.option("--dev", "-d", is_flag=True, help="Use sources instead of build output")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def start(project: str, args: list[str], dev: bool, verbose: bool):
project_path = Path("./")
if project is not None:
project_path = (Path("./") / project).resolve().absolute()
project = Structure.get_project_by_name_or_path(str(project_path))
if project.main is None:
Console.error(f"Project {project.name} has no executable")
return
LiveServer(
project,
args,
dev,
verbose,
).start()

Some files were not shown because too many files have changed in this diff Show More