Compare commits

..

1 Commits

Author SHA1 Message Date
69bbbc8cee Authorization via with_route
Some checks failed
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Successful in 20s
Build on push / core (push) Successful in 20s
Build on push / dependency (push) Successful in 17s
Build on push / mail (push) Successful in 15s
Build on push / application (push) Successful in 18s
Build on push / translation (push) Successful in 18s
Build on push / database (push) Successful in 25s
Build on push / auth (push) Successful in 15s
Build on push / api (push) Successful in 14s
Test before pr merge / test-lint (pull_request) Failing after 6s
2025-09-22 22:03:42 +02:00
516 changed files with 3033 additions and 9265 deletions

View File

@@ -16,68 +16,61 @@ jobs:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, application, auth, core, dependency ]
with:
working_directory: src/api
working_directory: src/cpl-api
secrets: inherit
application:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/application
working_directory: src/cpl-application
secrets: inherit
auth:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency, database ]
with:
working_directory: src/auth
secrets: inherit
cli:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/cli
working_directory: src/cpl-auth
secrets: inherit
core:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/core
working_directory: src/cpl-core
secrets: inherit
database:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/database
working_directory: src/cpl-database
secrets: inherit
dependency:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core ]
with:
working_directory: src/dependency
working_directory: src/cpl-dependency
secrets: inherit
mail:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/mail
working_directory: src/cpl-mail
secrets: inherit
query:
uses: ./.gitea/workflows/package.yaml
needs: [prepare]
with:
working_directory: src/query
working_directory: src/cpl-query
secrets: inherit
translation:
uses: ./.gitea/workflows/package.yaml
needs: [ prepare, core, dependency ]
with:
working_directory: src/translation
working_directory: src/cpl-translation
secrets: inherit

View File

@@ -36,12 +36,6 @@ jobs:
echo "Set version to $(cat /workspace/sh-edraft.de/cpl/version.txt)"
cat pyproject.toml
- name: Set package version
run: |
sed -i -E "s/^__version__ = \".*\"/__version__ = \"$(cat /workspace/sh-edraft.de/cpl/version.txt)\"/" cpl/*/__init__.py
echo "Set version to $(cat /workspace/sh-edraft.de/cpl/version.txt)"
cat cpl/*/__init__.py
- name: Set pip conf
run: |
cat > .pip.conf <<'EOF'

View File

@@ -25,11 +25,7 @@ jobs:
git tag
DATE=$(date +'%Y.%m.%d')
TAG_COUNT=$(git tag -l "${DATE}.*" | wc -l)
if [ "$TAG_COUNT" -eq 0 ]; then
BUILD_NUMBER=0
else
BUILD_NUMBER=$(($TAG_COUNT + 1))
fi
BUILD_NUMBER=$(($TAG_COUNT + 1))
VERSION_SUFFIX=${{ inputs.version_suffix }}
if [ -n "$VERSION_SUFFIX" ] && [ "$VERSION_SUFFIX" = "dev" ]; then

3
.gitignore vendored
View File

@@ -139,6 +139,3 @@ PythonImportHelper-v2-Completion.json
# cpl unittest stuff
unittests/test_*_playground
# cpl logs
**/logs/*.jsonl

View File

@@ -1,22 +0,0 @@
## Prepare for development
After cloning the repository, run the following commands to set up your development environment:
```bash
python -m venv .venv
source .venv/bin/activate
# On Windows use `.venv\Scripts\activate`
# On Windows with git bash use `source .venv/Scripts/activate`
bash install.sh
```
Install cpl-cli as a development package:
```bash
pip install -e src/core
pip install -e src/cli
# test with:
cpl v
```
When using Pycharm, mark all directories under `src/` as "Sources Root" and `exa` to ensure proper module resolution.

View File

@@ -1,12 +0,0 @@
{
"name": "cpl",
"projects": [
"src/cli/cpl.project.json",
"src/core/cpl.project.json",
"src/mail/cpl.project.json"
],
"defaultProject": "cpl-cli",
"scripts": {
"format": "black src"
}
}

View File

@@ -1,147 +0,0 @@
from starlette.responses import JSONResponse
from cpl.dependency.event_bus import EventBusABC
from cpl.graphql.event_bus.memory import InMemoryEventBus
from queries.cities import CityGraphType, CityFilter, CitySort
from queries.hello import UserGraphType # , UserFilter, UserSort, UserGraphType
from queries.user import UserFilter, UserSort
from cpl.api.api_module import ApiModule
from cpl.application.application_builder import ApplicationBuilder
from cpl.auth.schema import User, Role
from cpl.core.configuration import Configuration
from cpl.core.console import Console
from cpl.core.environment import Environment
from cpl.core.utils.cache import Cache
from cpl.database.mysql.mysql_module import MySQLModule
from cpl.graphql.application.graphql_app import GraphQLApp
from cpl.graphql.auth.graphql_auth_module import GraphQLAuthModule
from cpl.graphql.graphql_module import GraphQLModule
from model.author_dao import AuthorDao
from model.author_query import AuthorGraphType, AuthorFilter, AuthorSort
from model.post_dao import PostDao
from model.post_query import PostFilter, PostSort, PostGraphType, PostMutation, PostSubscription
from permissions import PostPermissions
from queries.hello import HelloQuery
from scoped_service import ScopedService
from service import PingService
from test_data_seeder import TestDataSeeder
def main():
builder = ApplicationBuilder[GraphQLApp](GraphQLApp)
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
# builder.services.add_logging()
(
builder.services.add_structured_logging()
.add_transient(PingService)
.add_module(MySQLModule)
.add_module(ApiModule)
.add_module(GraphQLModule)
.add_module(GraphQLAuthModule)
.add_scoped(ScopedService)
.add_singleton(EventBusABC, InMemoryEventBus)
.add_cache(User)
.add_cache(Role)
.add_transient(CityGraphType)
.add_transient(CityFilter)
.add_transient(CitySort)
.add_transient(UserGraphType)
.add_transient(UserFilter)
.add_transient(UserSort)
# .add_transient(UserGraphType)
# .add_transient(UserFilter)
# .add_transient(UserSort)
.add_transient(HelloQuery)
# test data
.add_singleton(TestDataSeeder)
# authors
.add_transient(AuthorDao)
.add_transient(AuthorGraphType)
.add_transient(AuthorFilter)
.add_transient(AuthorSort)
# posts
.add_transient(PostDao)
.add_transient(PostGraphType)
.add_transient(PostFilter)
.add_transient(PostSort)
.add_transient(PostMutation)
.add_transient(PostSubscription)
)
app = builder.build()
app.with_logging()
app.with_migrations("./scripts")
app.with_authentication()
app.with_authorization()
app.with_route(
path="/route1",
fn=lambda r: JSONResponse("route1"),
method="GET",
# authentication=True,
# permissions=[Permissions.administrator],
)
app.with_routes_directory("routes")
schema = app.with_graphql()
schema.query.string_field("ping", resolver=lambda: "pong")
schema.query.with_query("hello", HelloQuery)
schema.query.dao_collection_field(AuthorGraphType, AuthorDao, "authors", AuthorFilter, AuthorSort)
(
schema.query.dao_collection_field(PostGraphType, PostDao, "posts", PostFilter, PostSort)
# .with_require_any_permission(PostPermissions.read)
.with_public()
)
schema.mutation.with_mutation("post", PostMutation).with_public()
schema.subscription.with_subscription(PostSubscription)
app.with_auth_root_queries(True)
app.with_auth_root_mutations(True)
app.with_playground()
app.with_graphiql()
app.with_permissions(PostPermissions)
provider = builder.service_provider
user_cache = provider.get_service(Cache[User])
role_cache = provider.get_service(Cache[Role])
if role_cache == user_cache:
raise Exception("Cache service is not working")
s1 = provider.get_service(ScopedService)
s2 = provider.get_service(ScopedService)
if s1.name == s2.name:
raise Exception("Scoped service is not working")
with provider.create_scope() as scope:
s3 = scope.get_service(ScopedService)
s4 = scope.get_service(ScopedService)
if s3.name != s4.name:
raise Exception("Scoped service is not working")
if s1.name == s3.name:
raise Exception("Scoped service is not working")
Console.write_line(
s1.name,
s2.name,
s3.name,
s4.name,
)
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,30 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class Author(DbModelABC[Self]):
def __init__(
self,
id: int,
first_name: str,
last_name: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._first_name = first_name
self._last_name = last_name
@property
def first_name(self) -> str:
return self._first_name
@property
def last_name(self) -> str:
return self._last_name

View File

@@ -1,11 +0,0 @@
from cpl.database.abc import DbModelDaoABC
from model.author import Author
class AuthorDao(DbModelDaoABC):
def __init__(self):
DbModelDaoABC.__init__(self, Author, "authors")
self.attribute(Author.first_name, str, db_name="firstname")
self.attribute(Author.last_name, str, db_name="lastname")

View File

@@ -1,37 +0,0 @@
from cpl.graphql.schema.db_model_graph_type import DbModelGraphType
from cpl.graphql.schema.filter.db_model_filter import DbModelFilter
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
from model.author import Author
class AuthorFilter(DbModelFilter[Author]):
def __init__(self):
DbModelFilter.__init__(self, public=True)
self.int_field("id")
self.string_field("firstName")
self.string_field("lastName")
class AuthorSort(Sort[Author]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("firstName", SortOrder)
self.field("lastName", SortOrder)
class AuthorGraphType(DbModelGraphType[Author]):
def __init__(self):
DbModelGraphType.__init__(self, public=True)
self.int_field(
"id",
resolver=lambda root: root.id,
).with_public(True)
self.string_field(
"firstName",
resolver=lambda root: root.first_name,
).with_public(True)
self.string_field(
"lastName",
resolver=lambda root: root.last_name,
).with_public(True)

View File

@@ -1,44 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class Post(DbModelABC[Self]):
def __init__(
self,
id: int,
author_id: SerialId,
title: str,
content: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._author_id = author_id
self._title = title
self._content = content
@property
def author_id(self) -> SerialId:
return self._author_id
@property
def title(self) -> str:
return self._title
@title.setter
def title(self, value: str):
self._title = value
@property
def content(self) -> str:
return self._content
@content.setter
def content(self, value: str):
self._content = value

View File

@@ -1,15 +0,0 @@
from cpl.database.abc import DbModelDaoABC
from model.author_dao import AuthorDao
from model.post import Post
class PostDao(DbModelDaoABC[Post]):
def __init__(self, authors: AuthorDao):
DbModelDaoABC.__init__(self, Post, "posts")
self.attribute(Post.author_id, int, db_name="authorId")
self.reference("author", "id", Post.author_id, "authors", authors)
self.attribute(Post.title, str)
self.attribute(Post.content, str)

View File

@@ -1,148 +0,0 @@
from cpl.dependency.event_bus import EventBusABC
from cpl.graphql.query_context import QueryContext
from cpl.graphql.schema.db_model_graph_type import DbModelGraphType
from cpl.graphql.schema.filter.db_model_filter import DbModelFilter
from cpl.graphql.schema.input import Input
from cpl.graphql.schema.mutation import Mutation
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
from cpl.graphql.schema.subscription import Subscription
from model.author_dao import AuthorDao
from model.author_query import AuthorGraphType, AuthorFilter
from model.post import Post
from model.post_dao import PostDao
class PostFilter(DbModelFilter[Post]):
def __init__(self):
DbModelFilter.__init__(self, public=True)
self.int_field("id")
self.filter_field("author", AuthorFilter)
self.string_field("title")
self.string_field("content")
class PostSort(Sort[Post]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("title", SortOrder)
self.field("content", SortOrder)
class PostGraphType(DbModelGraphType[Post]):
def __init__(self, authors: AuthorDao):
DbModelGraphType.__init__(self, public=True)
self.int_field(
"id",
resolver=lambda root: root.id,
).with_optional().with_public(True)
async def _a(root: Post):
return await authors.get_by_id(root.author_id)
def r_name(ctx: QueryContext):
return ctx.user.username == "admin"
self.object_field("author", AuthorGraphType, resolver=_a).with_public(True) # .with_require_any([], [r_name]))
self.string_field(
"title",
resolver=lambda root: root.title,
).with_public(True)
self.string_field(
"content",
resolver=lambda root: root.content,
).with_public(True)
class PostCreateInput(Input[Post]):
title: str
content: str
author_id: int
def __init__(self):
Input.__init__(self)
self.string_field("title").with_required()
self.string_field("content").with_required()
self.int_field("author_id").with_required()
class PostUpdateInput(Input[Post]):
title: str
content: str
author_id: int
def __init__(self):
Input.__init__(self)
self.int_field("id").with_required()
self.string_field("title").with_required(False)
self.string_field("content").with_required(False)
class PostSubscription(Subscription):
def __init__(self, bus: EventBusABC):
Subscription.__init__(self)
self._bus = bus
def selector(event: Post, info) -> bool:
return event.id == 101
self.subscription_field("postChange", PostGraphType, selector).with_public()
class PostMutation(Mutation):
def __init__(self, posts: PostDao, authors: AuthorDao, bus: EventBusABC):
Mutation.__init__(self)
self._posts = posts
self._authors = authors
self._bus = bus
self.field("create", int, resolver=self.create_post).with_public().with_required().with_argument(
"input",
PostCreateInput,
).with_required()
self.field("update", bool, resolver=self.update_post).with_public().with_required().with_argument(
"input",
PostUpdateInput,
).with_required()
self.field("delete", bool, resolver=self.delete_post).with_public().with_required().with_argument(
"id",
int,
).with_required()
self.field("restore", bool, resolver=self.restore_post).with_public().with_required().with_argument(
"id",
int,
).with_required()
async def create_post(self, input: PostCreateInput) -> int:
return await self._posts.create(Post(0, input.author_id, input.title, input.content))
async def update_post(self, input: PostUpdateInput) -> bool:
post = await self._posts.get_by_id(input.id)
if post is None:
return False
post.title = input.title if input.title is not None else post.title
post.content = input.content if input.content is not None else post.content
await self._posts.update(post)
await self._bus.publish("postChange", post)
return True
async def delete_post(self, id: int) -> bool:
post = await self._posts.get_by_id(id)
if post is None:
return False
await self._posts.delete(post)
return True
async def restore_post(self, id: int) -> bool:
post = await self._posts.get_by_id(id)
if post is None:
return False
await self._posts.restore(post)
return True

View File

@@ -1,8 +0,0 @@
from enum import Enum
class PostPermissions(Enum):
read = "post.read"
write = "post.write"
delete = "post.delete"

View File

@@ -1,39 +0,0 @@
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.graph_type import GraphType
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
class City:
def __init__(self, id: int, name: str):
self.id = id
self.name = name
class CityFilter(Filter[City]):
def __init__(self):
Filter.__init__(self)
self.field("id", int)
self.field("name", str)
class CitySort(Sort[City]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("name", SortOrder)
class CityGraphType(GraphType[City]):
def __init__(self):
GraphType.__init__(self)
self.int_field(
"id",
resolver=lambda root: root.id,
)
self.string_field(
"name",
resolver=lambda root: root.name,
)

View File

@@ -1,70 +0,0 @@
from queries.cities import CityFilter, CitySort, CityGraphType, City
from queries.user import User, UserFilter, UserSort, UserGraphType
from cpl.api.middleware.request import get_request
from cpl.auth.schema import UserDao, User
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.graph_type import GraphType
from cpl.graphql.schema.query import Query
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
users = [User(i, f"User {i}") for i in range(1, 101)]
cities = [City(i, f"City {i}") for i in range(1, 101)]
# class UserFilter(Filter[User]):
# def __init__(self):
# Filter.__init__(self)
# self.field("id", int)
# self.field("username", str)
#
#
# class UserSort(Sort[User]):
# def __init__(self):
# Sort.__init__(self)
# self.field("id", SortOrder)
# self.field("username", SortOrder)
#
# class UserGraphType(GraphType[User]):
#
# def __init__(self):
# GraphType.__init__(self)
#
# self.int_field(
# "id",
# resolver=lambda root: root.id,
# )
# self.string_field(
# "username",
# resolver=lambda root: root.username,
# )
class HelloQuery(Query):
def __init__(self):
Query.__init__(self)
self.string_field(
"message",
resolver=lambda name: f"Hello {name} {get_request().state.request_id}",
).with_argument("name", str, "Name to greet", "world")
self.collection_field(
UserGraphType,
"users",
UserFilter,
UserSort,
resolver=lambda: users,
)
self.collection_field(
CityGraphType,
"cities",
CityFilter,
CitySort,
resolver=lambda: cities,
)
# self.dao_collection_field(
# UserGraphType,
# UserDao,
# "Users",
# UserFilter,
# UserSort,
# )

View File

@@ -1,39 +0,0 @@
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.graph_type import GraphType
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
class User:
def __init__(self, id: int, name: str):
self.id = id
self.name = name
class UserFilter(Filter[User]):
def __init__(self):
Filter.__init__(self)
self.field("id", int)
self.field("name", str)
class UserSort(Sort[User]):
def __init__(self):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("name", SortOrder)
class UserGraphType(GraphType[User]):
def __init__(self):
GraphType.__init__(self)
self.int_field(
"id",
resolver=lambda root: root.id,
)
self.string_field(
"name",
resolver=lambda root: root.name,
)

View File

@@ -1,21 +0,0 @@
from urllib.request import Request
from service import PingService
from starlette.responses import JSONResponse
from cpl.api import APILogger
from cpl.api.router import Router
from cpl.core.console import Console
from cpl.dependency import ServiceProvider
from scoped_service import ScopedService
@Router.authenticate()
# @Router.authorize(permissions=[Permissions.administrator])
# @Router.authorize(policies=["test"])
@Router.get(f"/ping")
async def ping(r: Request, ping: PingService, logger: APILogger, provider: ServiceProvider, scoped: ScopedService):
logger.info(f"Ping: {ping}")
Console.write_line(scoped.name)
return JSONResponse(ping.ping(r))

View File

@@ -1,14 +0,0 @@
from cpl.core.console.console import Console
from cpl.core.utils.string import String
class ScopedService:
def __init__(self):
self._name = String.random(8)
@property
def name(self) -> str:
return self._name
def run(self):
Console.write_line(f"Im {self._name}")

View File

@@ -1,22 +0,0 @@
CREATE TABLE IF NOT EXISTS `authors` (
`id` INT(30) NOT NULL AUTO_INCREMENT,
`firstname` VARCHAR(64) NOT NULL,
`lastname` VARCHAR(64) NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`)
);
CREATE TABLE IF NOT EXISTS `posts` (
`id` INT(30) NOT NULL AUTO_INCREMENT,
`authorId` INT(30) NOT NULL REFERENCES `authors`(`id`) ON DELETE CASCADE,
`title` TEXT NOT NULL,
`content` TEXT NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
editorId INT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`)
);

View File

@@ -1,48 +0,0 @@
from faker import Faker
from cpl.database.abc import DataSeederABC
from cpl.query import Enumerable
from model.author import Author
from model.author_dao import AuthorDao
from model.post import Post
from model.post_dao import PostDao
fake = Faker()
class TestDataSeeder(DataSeederABC):
def __init__(self, authors: AuthorDao, posts: PostDao):
DataSeederABC.__init__(self)
self._authors = authors
self._posts = posts
async def seed(self):
if await self._authors.count() == 0:
await self._seed_authors()
if await self._posts.count() == 0:
await self._seed_posts()
async def _seed_authors(self):
authors = Enumerable.range(0, 35).select(
lambda x: Author(
0,
fake.first_name(),
fake.last_name(),
)
).to_list()
await self._authors.create_many(authors, skip_editor=True)
async def _seed_posts(self):
posts = Enumerable.range(0, 100).select(
lambda x: Post(
id=0,
author_id=fake.random_int(min=1, max=35),
title=fake.sentence(nb_words=6),
content=fake.paragraph(nb_sentences=6),
)
).to_list()
await self._posts.create_many(posts, skip_editor=True)

View File

@@ -1,45 +0,0 @@
from cpl.application.abc import ApplicationABC
from cpl.core.console.console import Console
from cpl.dependency import ServiceProvider
from test_abc import TestABC
from test_service import TestService
from di_tester_service import DITesterService
from tester import Tester
class Application(ApplicationABC):
def __init__(self, services: ServiceProvider):
ApplicationABC.__init__(self, services)
def _part_of_scoped(self):
ts: TestService = self._services.get_service(TestService)
ts.run()
def main(self):
with self._services.create_scope() as scope:
Console.write_line("Scope1")
ts: TestService = scope.get_service(TestService)
ts.run()
dit: DITesterService = scope.get_service(DITesterService)
dit.run()
if ts.name != dit.name:
raise Exception("DI is broken!")
with self._services.create_scope() as scope:
Console.write_line("Scope2")
ts: TestService = scope.get_service(TestService)
ts.run()
dit: DITesterService = scope.get_service(DITesterService)
dit.run()
if ts.name != dit.name:
raise Exception("DI is broken!")
Console.write_line("Global")
self._part_of_scoped()
#from static_test import StaticTest
#StaticTest.test()
self._services.get_service(Tester)
Console.write_line(self._services.get_services(TestABC))

View File

@@ -1,27 +0,0 @@
from cpl.application.abc import StartupABC
from cpl.dependency import ServiceProvider, ServiceCollection
from di_tester_service import DITesterService
from test1_service import Test1Service
from test2_service import Test2Service
from test_abc import TestABC
from test_service import TestService
from tester import Tester
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
@staticmethod
def configure_configuration(): ...
@staticmethod
def configure_services(services: ServiceCollection) -> ServiceProvider:
services.add_scoped(TestService)
services.add_scoped(DITesterService)
services.add_singleton(TestABC, Test1Service)
services.add_singleton(TestABC, Test2Service)
services.add_singleton(Tester)
return services.build()

View File

@@ -1,10 +0,0 @@
from cpl.dependency import ServiceProvider, ServiceProvider
from cpl.dependency.inject import inject
from test_service import TestService
class StaticTest:
@staticmethod
@inject
def test(services: ServiceProvider, t1: TestService):
t1.run()

View File

@@ -1,7 +0,0 @@
from cpl.core.console.console import Console
from test_abc import TestABC
class Tester:
def __init__(self, t1: TestABC, t2: TestABC, t3: TestABC, t: list[TestABC]):
Console.write_line("Tester:", t, t1, t2, t3)

View File

@@ -1,30 +0,0 @@
import asyncio
from datetime import datetime
from cpl.core.console import Console
from cpl.core.time.cron import Cron
from cpl.core.service.cronjob import CronjobABC
from cpl.core.service.hosted_service import HostedService
class Hosted(HostedService):
def __init__(self):
self._stopped = False
async def start(self):
Console.write_line("Hosted Service Started")
while not self._stopped:
Console.write_line("Hosted Service Running")
await asyncio.sleep(5)
async def stop(self):
Console.write_line("Hosted Service Stopped")
self._stopped = True
class MyCronJob(CronjobABC):
def __init__(self):
CronjobABC.__init__(self, Cron("*/1 * * * *")) # Every minute
async def loop(self):
Console.write_line(f"[{datetime.now()}] Hello from Cronjob!")

View File

@@ -1,10 +0,0 @@
from cpl.core.console import Console
class ScopedService:
def __init__(self):
self.value = "I am a scoped service"
Console.write_line(self.value, self)
def get_value(self):
return self.value

View File

@@ -1,60 +0,0 @@
from cpl.core.console import Console
from cpl.core.utils.benchmark import Benchmark
from cpl.query.enumerable import Enumerable
from cpl.query.immutable_list import ImmutableList
from cpl.query.list import List
from cpl.query.set import Set
def _default():
Console.write_line(Enumerable.empty().to_list())
Console.write_line(Enumerable.range(0, 100).length)
Console.write_line(Enumerable.range(0, 100).to_list())
Console.write_line(Enumerable.range(0, 100).where(lambda x: x % 2 == 0).length)
Console.write_line(
Enumerable.range(0, 100).where(lambda x: x % 2 == 0).to_list().select(lambda x: str(x)).to_list()
)
Console.write_line(List)
s =Enumerable.range(0, 10).to_set()
Console.write_line(s)
s.add(1)
Console.write_line(s)
data = Enumerable(
[
{"name": "Alice", "age": 30},
{"name": "Dave", "age": 35},
{"name": "Charlie", "age": 25},
{"name": "Bob", "age": 25},
]
)
Console.write_line(data.order_by(lambda x: x["age"]).to_list())
Console.write_line(data.order_by(lambda x: x["age"]).then_by(lambda x: x["name"]).to_list())
Console.write_line(data.order_by(lambda x: x["name"]).then_by(lambda x: x["age"]).to_list())
def t_benchmark(data: list):
Benchmark.all("Enumerable", lambda: Enumerable(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("Set", lambda: Set(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all("List", lambda: List(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list())
Benchmark.all(
"ImmutableList", lambda: ImmutableList(data).where(lambda x: x % 2 == 0).select(lambda x: x * 2).to_list()
)
Benchmark.all("List comprehension", lambda: [x * 2 for x in data if x % 2 == 0])
def main():
N = 1_000_000
data = list(range(N))
t_benchmark(data)
Console.write_line()
_default()
if __name__ == "__main__":
main()

View File

@@ -1,15 +1,15 @@
#!/usr/bin/env bash
set -euo pipefail
# Find and combine requirements from src/*/requirements.txt,
# Find and combine requirements from src/cpl-*/requirements.txt,
# filtering out lines whose *package name* starts with "cpl-".
# Works with pinned versions, extras, markers, editable installs, and VCS refs.
shopt -s nullglob
req_files=(src/*/requirements.txt)
req_files=(src/cpl-*/requirements.txt)
if ((${#req_files[@]} == 0)); then
echo "No requirements files found at src/*/requirements.txt" >&2
echo "No requirements files found at src/cpl-*/requirements.txt" >&2
exit 1
fi

View File

@@ -1,6 +0,0 @@
from .error import APIError, AlreadyExists, EndpointNotImplemented, Forbidden, NotFound, Unauthorized
from .logger import APILogger
from .settings import ApiSettings
from .api_module import ApiModule
__version__ = "1.0.0"

View File

@@ -1 +0,0 @@
from .asgi_middleware_abc import ASGIMiddleware

View File

@@ -1,45 +0,0 @@
from abc import ABC
from enum import Enum
from typing import Self
from starlette.applications import Starlette
from cpl.api.model.api_route import ApiRoute
from cpl.api.model.validation_match import ValidationMatch
from cpl.api.typing import HTTPMethods, PartialMiddleware, TEndpoint, PolicyInput
from cpl.application.abc.application_abc import ApplicationABC
from cpl.dependency.service_provider import ServiceProvider
from cpl.dependency.typing import Modules
class WebAppABC(ApplicationABC, ABC):
def __init__(self, services: ServiceProvider, modules: Modules, required_modules: list[str | object] = None):
ApplicationABC.__init__(self, services, modules, required_modules)
def with_routes_directory(self, directory: str) -> Self: ...
def with_app(self, app: Starlette) -> Self: ...
def with_routes(
self,
routes: list[ApiRoute],
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self: ...
def with_route(
self,
path: str,
fn: TEndpoint,
method: HTTPMethods,
authentication: bool = False,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
) -> Self: ...
def with_middleware(self, middleware: PartialMiddleware) -> Self: ...
def with_authentication(self) -> Self: ...
def with_authorization(self, *policies: list[PolicyInput] | PolicyInput) -> Self: ...

View File

@@ -1,22 +0,0 @@
from cpl.api import ApiSettings
from cpl.api.registry.policy import PolicyRegistry
from cpl.api.registry.route import RouteRegistry
from cpl.auth.auth_module import AuthModule
from cpl.auth.permission.permission_module import PermissionsModule
from cpl.database.database_module import DatabaseModule
from cpl.dependency import ServiceCollection
from cpl.dependency.module.module import Module
class ApiModule(Module):
config = [ApiSettings]
singleton = [
PolicyRegistry,
RouteRegistry,
]
@staticmethod
def register(collection: ServiceCollection):
collection.add_module(DatabaseModule)
collection.add_module(AuthModule)
collection.add_module(PermissionsModule)

View File

@@ -1 +0,0 @@
from .web_app import WebApp

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class APILogger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "api")

View File

@@ -1,4 +0,0 @@
from .authentication import AuthenticationMiddleware
from .authorization import AuthorizationMiddleware
from .logging import LoggingMiddleware
from .request import RequestMiddleware

View File

@@ -1,98 +0,0 @@
import time
from contextvars import ContextVar
from typing import Optional, Union
from uuid import uuid4
from starlette.requests import Request
from starlette.types import Scope, Receive, Send
from starlette.websockets import WebSocket
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
from cpl.api.logger import APILogger
from cpl.api.typing import TRequest
from cpl.auth.keycloak.keycloak_client import KeycloakClient
from cpl.auth.schema import User
from cpl.auth.schema._administration.user_dao import UserDao
from cpl.core.ctx import set_user
from cpl.dependency.inject import inject
from cpl.dependency.service_provider import ServiceProvider
_request_context: ContextVar[Union[TRequest, None]] = ContextVar("request", default=None)
class RequestMiddleware(ASGIMiddleware):
def __init__(self, app, provider: ServiceProvider, logger: APILogger, keycloak: KeycloakClient, user_dao: UserDao):
ASGIMiddleware.__init__(self, app)
self._provider = provider
self._logger = logger
self._keycloak = keycloak
self._user_dao = user_dao
self._ctx_token = None
async def __call__(self, scope: Scope, receive: Receive, send: Send):
request = Request(scope, receive, send) if scope["type"] != "websocket" else WebSocket(scope, receive, send)
await self.set_request_data(request)
try:
await self._try_set_user(request)
with self._provider.create_scope():
inject(await self._app(scope, receive, send))
finally:
await self.clean_request_data()
async def set_request_data(self, request: TRequest):
request.state.request_id = uuid4()
request.state.start_time = time.time()
self._logger.trace(f"Set new current request: {request.state.request_id}")
self._ctx_token = _request_context.set(request)
async def clean_request_data(self):
request = get_request()
if request is None:
return
if self._ctx_token is None:
return
self._logger.trace(f"Clearing current request: {request.state.request_id}")
_request_context.reset(self._ctx_token)
async def _try_set_user(self, request: Request):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return
token = auth_header.split("Bearer ")[1]
try:
token_info = self._keycloak.introspect(token)
if not token_info.get("active", False):
return
keycloak_id = self._keycloak.get_user_id(token)
if not keycloak_id:
return
user = await self._user_dao.find_by_keycloak_id(keycloak_id)
if not user:
user = User(0, keycloak_id)
uid = await self._user_dao.create(user)
user = await self._user_dao.get_by_id(uid)
if user.deleted:
return
request.state.user = user
set_user(user)
self._logger.trace(f"User {user.id} bound to request {request.state.request_id}")
except Exception as e:
self._logger.debug(f"Silent user binding failed: {e}")
def get_request() -> Optional[TRequest]:
return _request_context.get()

View File

@@ -1,3 +0,0 @@
from .api_route import ApiRoute
from .policy import Policy
from .validation_match import ValidationMatch

View File

@@ -1,31 +0,0 @@
from typing import Callable
import starlette.routing
class WebSocketRoute:
def __init__(self, path: str, fn: Callable, **kwargs):
self._path = path
self._fn = fn
self._kwargs = kwargs
@property
def name(self) -> str:
return self._fn.__name__
@property
def fn(self) -> Callable:
return self._fn
@property
def path(self) -> str:
return self._path
@property
def kwargs(self) -> dict:
return self._kwargs
def to_starlette(self, *args) -> starlette.routing.WebSocketRoute:
return starlette.routing.WebSocketRoute(self._path, self._fn)

View File

@@ -1,2 +0,0 @@
from .policy import PolicyRegistry
from .route import RouteRegistry

View File

@@ -1,106 +0,0 @@
from abc import ABC, abstractmethod
from typing import Callable, Self
from cpl.application.host import Host
from cpl.core.errors import module_dependency_error
from cpl.core.log.log_level import LogLevel
from cpl.core.log.log_settings import LogSettings
from cpl.core.log.logger_abc import LoggerABC
from cpl.dependency.service_provider import ServiceProvider
from cpl.dependency.typing import TModule
def __not_implemented__(package: str, func: Callable):
raise NotImplementedError(f"Package {package} is required to use {func.__name__} method")
class ApplicationABC(ABC):
r"""ABC for the Application class
Parameters:
services: :class:`cpl.dependency.service_provider.ServiceProvider`
Contains instances of prepared objects
"""
@abstractmethod
def __init__(
self, services: ServiceProvider, loaded_modules: set[TModule], required_modules: list[str | object] = None
):
self._services = services
self._modules = loaded_modules
self._required_modules = (
[x.__name__ if not isinstance(x, str) else x for x in required_modules] if required_modules else []
)
def validate_app_required_modules(self):
modules_names = {x.__name__ for x in self._modules}
for module in self._required_modules:
if module in modules_names:
continue
module_dependency_error(
type(self).__name__,
module.__name__ if not isinstance(module, str) else module,
ImportError(
f"Required module '{module}' for application '{self.__class__.__name__}' is not loaded. Load using 'add_module({module})' method."
),
)
def with_logging(self, level: LogLevel = None):
if level is None:
from cpl.core.configuration.configuration import Configuration
settings = Configuration.get(LogSettings)
level = settings.level if settings else LogLevel.info
logger = self._services.get_service(LoggerABC)
logger.set_level(level)
def with_permissions(self, *args):
try:
from cpl.auth import AuthModule
AuthModule.with_permissions(*args)
except ImportError:
__not_implemented__("cpl-auth", self.with_permissions)
def with_migrations(self, *args):
try:
from cpl.database.database_module import DatabaseModule
DatabaseModule.with_migrations(self._services, *args)
except ImportError:
__not_implemented__("cpl-database", self.with_migrations)
def with_extension(self, func: Callable[[Self, ...], None], *args, **kwargs):
r"""Extend the Application with a custom method
Parameters:
func: :class:`Callable[[Self], Self]`
Function that takes the Application as a parameter and returns it
"""
assert func is not None, "func must not be None"
assert callable(func), "func must be callable"
func(self, *args, **kwargs)
def run(self):
r"""Entry point
Called by custom Application.main
"""
try:
for module in self._modules:
if not hasattr(module, "configure") and not callable(getattr(module, "configure")):
continue
module.configure(self._services)
Host.run_app(self.main)
except KeyboardInterrupt:
pass
finally:
logger = self._services.get_service(LoggerABC)
logger.info("Application shutdown")
@abstractmethod
def main(self): ...

View File

@@ -1,98 +0,0 @@
import asyncio
from typing import Callable
from cpl.core.property import classproperty
from cpl.dependency.context import get_provider, use_root_provider
from cpl.dependency.service_collection import ServiceCollection
from cpl.core.service.startup_task import StartupTask
class Host:
_loop: asyncio.AbstractEventLoop | None = None
_tasks: dict = {}
_service_collection: ServiceCollection | None = None
@classproperty
def services(cls) -> ServiceCollection:
if cls._service_collection is None:
cls._service_collection = ServiceCollection()
return cls._service_collection
@classmethod
def get_provider(cls):
provider = get_provider()
if provider is None:
provider = cls.services.build()
use_root_provider(provider)
return provider
@classmethod
def get_loop(cls) -> asyncio.AbstractEventLoop:
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
return cls._loop
@classmethod
def run_start_tasks(cls):
provider = cls.get_provider()
tasks = provider.get_services(StartupTask)
loop = cls.get_loop()
for task in tasks:
if asyncio.iscoroutinefunction(task.run):
loop.run_until_complete(task.run())
else:
task.run()
@classmethod
def run_hosted_services(cls):
provider = cls.get_provider()
services = provider.get_hosted_services()
loop = cls.get_loop()
for service in services:
if asyncio.iscoroutinefunction(service.start):
cls._tasks[service] = loop.create_task(service.start())
@classmethod
async def _stop_all(cls):
for service in cls._tasks.keys():
if asyncio.iscoroutinefunction(service.stop):
await service.stop()
for task in cls._tasks.values():
task.cancel()
cls._tasks.clear()
@classmethod
async def wait_for_all(cls):
await asyncio.gather(*cls._tasks.values())
@classmethod
def run_app(cls, func: Callable, *args, **kwargs):
cls.run_start_tasks()
cls.run_hosted_services()
async def runner():
try:
if asyncio.iscoroutinefunction(func):
await func(*args, **kwargs)
else:
func(*args, **kwargs)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
cls.get_loop().run_until_complete(runner())
cls.get_loop().run_until_complete(cls.wait_for_all())
@classmethod
def run(cls, func: Callable, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return cls.get_loop().run_until_complete(func(*args, **kwargs))
return func(*args, **kwargs)

View File

@@ -1,8 +0,0 @@
from cpl.auth import permission as _permission
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin as _KeycloakAdmin
from cpl.auth.keycloak.keycloak_client import KeycloakClient as _KeycloakClient
from .auth_module import AuthModule
from .keycloak_settings import KeycloakSettings
from .logger import AuthLogger
__version__ = "1.0.0"

View File

@@ -1,56 +0,0 @@
import os
from enum import Enum
from typing import Type
from cpl.auth.keycloak_settings import KeycloakSettings
from cpl.database.database_module import DatabaseModule
from cpl.database.model.server_type import ServerType, ServerTypes
from cpl.database.mysql.mysql_module import MySQLModule
from cpl.database.postgres.postgres_module import PostgresModule
from cpl.dependency.module.module import Module
from cpl.dependency.service_provider import ServiceProvider
from .keycloak.keycloak_admin import KeycloakAdmin
from .keycloak.keycloak_client import KeycloakClient
from .schema._administration.api_key_dao import ApiKeyDao
from .schema._administration.user_dao import UserDao
from .schema._permission.api_key_permission_dao import ApiKeyPermissionDao
from .schema._permission.permission_dao import PermissionDao
from .schema._permission.role_dao import RoleDao
from .schema._permission.role_permission_dao import RolePermissionDao
from .schema._permission.role_user_dao import RoleUserDao
class AuthModule(Module):
dependencies = [DatabaseModule, (MySQLModule, PostgresModule)]
config = [KeycloakSettings]
singleton = [
KeycloakClient,
KeycloakAdmin,
UserDao,
ApiKeyDao,
ApiKeyPermissionDao,
PermissionDao,
RoleDao,
RolePermissionDao,
RoleUserDao,
]
scoped = []
transient = []
@staticmethod
def configure(provider: ServiceProvider):
paths = {
ServerTypes.POSTGRES: "scripts/postgres",
ServerTypes.MYSQL: "scripts/mysql",
}
DatabaseModule.with_migrations(
provider, str(os.path.join(os.path.dirname(os.path.realpath(__file__)), paths[ServerType.server_type]))
)
@staticmethod
def with_permissions(*permissions: Type[Enum]):
from cpl.auth.permission.permissions_registry import PermissionsRegistry
for perm in permissions:
PermissionsRegistry.with_enum(perm)

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class AuthLogger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "auth")

View File

@@ -1,4 +0,0 @@
from .permission_module import PermissionsModule
from .permission_seeder import PermissionSeeder
from .permissions import Permissions
from .permissions_registry import PermissionsRegistry

View File

@@ -1,18 +0,0 @@
from cpl.auth.auth_module import AuthModule
from cpl.auth.permission.permission_seeder import PermissionSeeder
from cpl.auth.permission.permissions import Permissions
from cpl.auth.permission.permissions_registry import PermissionsRegistry
from cpl.auth.permission.role_seeder import RoleSeeder
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.database_module import DatabaseModule
from cpl.dependency.module.module import Module
from cpl.dependency.service_collection import ServiceCollection
class PermissionsModule(Module):
dependencies = [DatabaseModule, AuthModule]
transient = [(DataSeederABC, PermissionSeeder), (DataSeederABC, RoleSeeder)]
@staticmethod
def register(collection: ServiceCollection):
PermissionsRegistry.with_enum(Permissions)

View File

@@ -1,60 +0,0 @@
from cpl.auth.schema import (
Role,
RolePermission,
PermissionDao,
RoleDao,
RolePermissionDao,
ApiKeyDao,
ApiKeyPermissionDao,
UserDao,
RoleUserDao,
RoleUser,
)
from cpl.database.abc.data_seeder_abc import DataSeederABC
from cpl.database.logger import DBLogger
class RoleSeeder(DataSeederABC):
def __init__(
self,
logger: DBLogger,
permission_dao: PermissionDao,
role_dao: RoleDao,
role_permission_dao: RolePermissionDao,
api_key_dao: ApiKeyDao,
api_key_permission_dao: ApiKeyPermissionDao,
user_dao: UserDao,
role_user_dao: RoleUserDao,
):
DataSeederABC.__init__(self)
self._logger = logger
self._permission_dao = permission_dao
self._role_dao = role_dao
self._role_permission_dao = role_permission_dao
self._api_key_dao = api_key_dao
self._api_key_permission_dao = api_key_permission_dao
self._user_dao = user_dao
self._role_user_dao = role_user_dao
async def seed(self):
self._logger.info("Creating admin role")
roles = await self._role_dao.get_all()
if len(roles) == 0:
rid = await self._role_dao.create(Role(0, "admin", "Default admin role"))
permissions = await self._permission_dao.get_all()
await self._role_permission_dao.create_many(
[RolePermission(0, rid, permission.id) for permission in permissions]
)
role = await self._role_dao.get_by_name("admin")
if len(await role.users) > 0:
return
users = await self._user_dao.get_all()
if len(users) == 0:
return
user = users[0]
self._logger.warning(f"Assigning admin role to first user {user.id}")
await self._role_user_dao.create(RoleUser(0, role.id, user.id))

View File

@@ -1,89 +0,0 @@
import uuid
from datetime import datetime
from typing import Optional, Self
from async_property import async_property
from keycloak import KeycloakGetError
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.permission.permissions import Permissions
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.database.logger import DBLogger
from cpl.dependency import get_provider
class User(DbModelABC[Self]):
def __init__(
self,
id: SerialId,
keycloak_id: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._keycloak_id = keycloak_id
@property
def keycloak_id(self) -> str:
return self._keycloak_id
@property
def username(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak = get_provider().get_service(KeycloakAdmin)
return keycloak.get_user(self._keycloak_id).get("username")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
logger = get_provider().get_service(DBLogger)
logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@property
def email(self):
if self._keycloak_id == str(uuid.UUID(int=0)):
return "ANONYMOUS"
try:
keycloak = get_provider().get_service(KeycloakAdmin)
return keycloak.get_user(self._keycloak_id).get("email")
except KeycloakGetError as e:
return "UNKNOWN"
except Exception as e:
logger = get_provider().get_service(DBLogger)
logger.error(f"Failed to get user {self._keycloak_id} from Keycloak", e)
return "UNKNOWN"
@async_property
async def roles(self):
from cpl.auth.schema._permission.role_user_dao import RoleUserDao
role_user_dao: RoleUserDao = get_provider().get_service(RoleUserDao)
return [await x.role for x in await role_user_dao.get_by_user_id(self.id)]
@async_property
async def permissions(self):
from cpl.auth.schema._administration.user_dao import UserDao
user_dao: UserDao = get_provider().get_service(UserDao)
return await user_dao.get_permissions(self.id)
async def has_permission(self, permission: Permissions) -> bool:
from cpl.auth.schema._administration.user_dao import UserDao
user_dao: UserDao = get_provider().get_service(UserDao)
return await user_dao.has_permission(self.id, permission)
async def anonymize(self):
from cpl.auth.schema._administration.user_dao import UserDao
user_dao: UserDao = get_provider().get_service(UserDao)
self._keycloak_id = str(uuid.UUID(int=0))
await user_dao.update(self)

View File

@@ -1,44 +0,0 @@
from datetime import datetime
from typing import Self
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import get_provider
class RolePermission(DbJoinModelABC[Self]):
def __init__(
self,
id: SerialId,
role_id: SerialId,
permission_id: SerialId,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbJoinModelABC.__init__(self, id, role_id, permission_id, deleted, editor_id, created, updated)
@property
def role_id(self) -> int:
return self._source_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = get_provider().get_service(RoleDao)
return await role_dao.get_by_id(self._source_id)
@property
def permission_id(self) -> int:
return self._foreign_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = get_provider().get_service(PermissionDao)
return await permission_dao.get_by_id(self._foreign_id)

View File

@@ -1,29 +0,0 @@
{
"name": "cpl-cli",
"version": "0.1.0",
"type": "console",
"license": "MIT",
"author": "Sven Heidemann",
"description": "CLI for the CPL library",
"homepage": "",
"keywords": [],
"dependencies": {
"click": "~8.3.0"
},
"devDependencies": {
"black": "~25.9"
},
"references": [],
"main": "cpl/cli/main.py",
"directory": "cpl/cli",
"build": {
"include": [
"_templates/"
],
"exclude": [
"**/__pycache__",
"**/logs",
"**/tests"
]
}
}

View File

@@ -1,9 +0,0 @@
from abc import ABC
class <Name>ABC(ABC):
def __init__(self):
ABC.__init__(self)
print("<schematic> <multi_camelName> initialized")

View File

@@ -1,16 +0,0 @@
from cpl.application.abc import ApplicationABC
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.dependency import ServiceProvider
from cpl.dependency.typing import Modules
class <Name>(ApplicationABC):
def __init__(self, services: ServiceProvider, modules: Modules):
ApplicationABC.__init__(self, services, modules)
self._logger = services.get_service(LoggerABC)
async def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")

View File

@@ -1,10 +0,0 @@
from cpl.core.configuration import ConfigurationModelABC
class <Name>Config(ConfigurationModelABC):
def __init__(
self,
src: dict = None,
):
ConfigurationModelABC.__init__(self, src)

View File

@@ -1,9 +0,0 @@
from cpl.core.console import Console
from cpl.core.service import CronjobABC
class CronJob(CronjobABC):
def __init__(self):
CronjobABC.__init__(self, Cron("*/1 * * * *"))
async def loop(self):
Console.write_line(f"[{datetime.now()}] Hello, World!")

View File

@@ -1,9 +0,0 @@
from cpl.database.abc import DbModelDaoABC
class <Name>Dao(DbModelDaoABC[<Name>]):
def __init__(self):
DbModelDaoABC.__init__(self, <Name>, "<multi_name>")
self.attribute(<Name>.name, str)

View File

@@ -1,23 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
class <Name>(DbModelABC[Self]):
def __init__(
self,
id: SerialId,
name: str,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._name = name
@property
def name(self) -> str:
return self._name

View File

@@ -1,29 +0,0 @@
from datetime import datetime
from typing import Self
from cpl.core.typing import SerialId
from cpl.database.abc import DbJoinModelABC
class <Name>Join(DbJoinModelABC[Self]):
def __init__(
self,
id: SerialId,
source_id: SerialId,
reference_id: SerialId,
deleted: bool = False,
editor_id: SerialId | None = None,
created: datetime | None = None,
updated: datetime | None = None,
):
DbJoinModelABC.__init__(self, source_id, reference_id, id, deleted, editor_id, created, updated)
self._source_id = source_id
self._reference_id = reference_id
@property
def source_id(self) -> int:
return self._source_id
@property
def reference(self) -> int:
return self._reference_id

View File

@@ -1,5 +0,0 @@
from enum import Enum
class <Name>Enum(Enum):
KEY = "value"

View File

@@ -1,13 +0,0 @@
from cpl.core.console import Console
from cpl.core.service import HostedService
class <Name>(HostedService):
def __init__(self):
HostedService.__init__(self)
async def start(self):
Console.write_line("Hello, World!")
async def stop(self):
Console.write_line("Goodbye, World!")

View File

@@ -1,7 +0,0 @@
from cpl.core.log.wrapped_logger import WrappedLogger
class <Name>Logger(WrappedLogger):
def __init__(self):
WrappedLogger.__init__(self, "<name>")

View File

@@ -1,17 +0,0 @@
from cpl.dependency import ServiceCollection, ServiceProvider
from cpl.dependency.module import Module
class <Name>Module(Module):
dependencies = []
configuration = []
singleton = []
scoped = []
transient = []
hosted = []
@staticmethod
def register(collection: ServiceCollection): ...
@staticmethod
def configure(provider: ServiceProvider): ...

View File

@@ -1,9 +0,0 @@
import multiprocessing
class <Name>(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
def run(self): ...

View File

@@ -1,11 +0,0 @@
from cpl.core.pipes import PipeABC
from cpl.core.typing import T
class <Name>Pipe(PipeABC):
@staticmethod
def to_str(value: T, *args) -> str: ...
@staticmethod
def from_str(value: str, *args) -> T: ...

View File

@@ -1,9 +0,0 @@
import threading
class <Name>(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self): ...

View File

@@ -1,18 +0,0 @@
from cpl.api.application import WebApp
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.dependency import ServiceProvider
from cpl.dependency.typing import Modules
class <Name>(WebApp):
def __init__(self, services: ServiceProvider, modules: Modules):
WebApp.__init__(self, services, modules)
self._logger = services.get_service(LoggerABC)
async def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")
await super().main()

View File

@@ -1,9 +0,0 @@
from cpl.core.console import Console
def main():
Console.write_line("Hello, World!")
if __name__ == "__main__":
main()

View File

@@ -1,46 +0,0 @@
from cpl.api import ApiModule
from cpl.application import ApplicationBuilder
from cpl.core.configuration import Configuration
from cpl.graphql.application import GraphQLApp
from starlette.responses import JSONResponse
def main():
builder = ApplicationBuilder[GraphQLApp](GraphQLApp)
Configuration.add_json_file(f"appsettings.json", optional=True)
(
builder.services.add_logging()
# uncomment to add preferred database module
# .add_module(MySQLModule)
# .add_module(PostgresModule)
.add_module(ApiModule)
)
app = builder.build()
app.with_logging()
app.with_authentication()
app.with_authorization()
app.with_route(
path="/ping",
fn=lambda r: JSONResponse("pong"),
method="GET",
)
schema = app.with_graphql()
schema.query.string_field("ping", resolver=lambda: "pong")
app.with_auth_root_queries(True)
app.with_auth_root_mutations(True)
app.with_playground()
app.with_graphiql()
app.run()
if __name__ == "__main__":
main()

View File

@@ -1,3 +0,0 @@
class Class1:
def __init__(self): ...

View File

@@ -1,13 +0,0 @@
from cpl.application import Host
from my_hosted_service import MyHostedService
async def main():
Host.services.add_hosted_service(MyHostedService)
Host.run_start_tasks()
Host.run_hosted_services()
await Host.wait_for_all()
if __name__ == "__main__":
Host.run(main)

View File

@@ -1,13 +0,0 @@
from cpl.core.console import Console
from cpl.dependency.hosted import HostedService
class MyHostedService(HostedService):
def __init__(self):
HostedService.__init__(self)
async def start(self):
Console.write_line("Hello, World!")
async def stop(self):
Console.write_line("Goodbye, World!")

View File

@@ -1,37 +0,0 @@
from starlette.responses import JSONResponse
from cpl.api import ApiModule
from cpl.api.application import WebApp
from cpl.application import ApplicationBuilder
from cpl.core.configuration import Configuration
def main():
builder = ApplicationBuilder[WebApp](WebApp)
Configuration.add_json_file(f"appsettings.json", optional=True)
(
builder.services.add_logging()
# uncomment to add preferred database module
# .add_module(MySQLModule)
# .add_module(PostgresModule)
.add_module(ApiModule)
)
app = builder.build()
app.with_logging()
app.with_authentication()
app.with_authorization()
app.with_route(
path="/ping",
fn=lambda r: JSONResponse("pong"),
method="GET",
)
app.run()
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
__version__ = "1.0.0"

View File

@@ -1,55 +0,0 @@
import traceback
import click
from cpl.core.console import Console
class AliasedGroup(click.Group):
def command(self, *args, **kwargs):
aliases = kwargs.pop("aliases", [])
def decorator(f):
cmd = super(AliasedGroup, self).command(*args, **kwargs)(f)
cmd.callback = self._handle_errors(cmd.callback)
for alias in aliases:
self.add_command(cmd, alias)
return cmd
return decorator
def format_commands(self, ctx, formatter):
commands = []
seen = set()
for name, cmd in self.commands.items():
if cmd in seen:
continue
seen.add(cmd)
aliases = [a for a, c in self.commands.items() if c is cmd and a != name]
alias_text = f" (aliases: {', '.join(aliases)})" if aliases else ""
commands.append((name, f"{cmd.short_help or ''}{alias_text}"))
with formatter.section("Commands"):
formatter.write_dl(commands)
@staticmethod
def _handle_errors(f):
def wrapper(*args, **kwargs):
try:
res = f(*args, **kwargs)
Console.write_line()
return res
except Exception as e:
tb = None
if "verbose" in kwargs and kwargs["verbose"]:
tb = traceback.format_exc()
Console.error(str(e), tb)
Console.write_line()
exit(-1)
return wrapper
@click.group(cls=AliasedGroup)
def cli(): ...

View File

@@ -1,27 +0,0 @@
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("add", aliases=["a"])
@click.argument("reference", type=click.STRING, required=True)
@click.argument("target", type=click.STRING, required=True)
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def add(reference: str, target: str, verbose: bool):
reference_project = Structure.get_project_by_name_or_path(reference)
target_project = Structure.get_project_by_name_or_path(target)
if reference_project.name == target_project.name:
raise ValueError("Cannot add a project as a dependency to itself!")
if reference_project.path in target_project.references:
raise ValueError(f"Project '{reference_project.name}' is already a reference of '{target_project.name}'")
rel_path = Path(reference_project.path).relative_to(Path(target_project.path).parent, walk_up=True)
target_project.references.append(str(rel_path))
target_project.save()
Console.write_line(f"Added '{reference_project.name}' to '{target_project.name}' project")

View File

@@ -1,69 +0,0 @@
import os
import subprocess
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.const import PIP_URL
from cpl.cli.utils.pip import Pip
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("install", aliases=["i"])
@click.argument("package", type=click.STRING, required=False)
@click.argument("project", type=click.STRING, required=False)
@click.option("--dev", is_flag=True, help="Include dev dependencies")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def install(package: str, project: str, dev: bool, verbose: bool):
project = Structure.get_project_by_name_or_path(project or "./")
if package is not None:
Console.write_line(f"Installing {package} to '{project.name}':")
try:
Pip.command(
f"install --extra-index-url {PIP_URL}",
package,
verbose=verbose,
path=Path(project.path).parent,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to install {package}: exit code {e.returncode}")
return
package_name = Pip.get_package_without_version(package)
installed_version = Pip.get_package_version(package_name, path=project.path)
if installed_version is None:
Console.error(f"Package '{package_name}' not found after installation.")
return
deps = project.dependencies if not dev else project.dev_dependencies
deps[package_name] = Pip.apply_prefix(installed_version, Pip.get_package_full_version(package))
project.save()
Console.write_line(f"Added {package_name}~{installed_version} to project dependencies.")
return
deps: dict = project.dependencies
if dev:
deps.update(project.dev_dependencies)
if not deps:
Console.error("No dependencies to install.")
return
Console.write_line(f"Installing dependencies for '{project.name}':")
for name, version in deps.items():
dep = Pip.normalize_dep(name, version)
Console.write_line(f" -> {dep}")
try:
Pip.command(
"install --extra-index-url https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/",
dep,
verbose=verbose,
path=project.path,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to install {dep}: exit code {e.returncode}")

View File

@@ -1,26 +0,0 @@
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("remove", aliases=["rm"])
@click.argument("reference", type=click.STRING, required=True)
@click.argument("target", type=click.STRING, required=True)
def remove(reference: str, target: str):
reference_project = Structure.get_project_by_name_or_path(reference)
target_project = Structure.get_project_by_name_or_path(target)
if reference_project.name == target_project.name:
raise ValueError("Cannot add a project as a dependency to itself!")
rel_path = str(Path(reference_project.path).relative_to(Path(target_project.path).parent, walk_up=True))
if rel_path not in target_project.references:
raise ValueError(f"Project '{reference_project.name}' isn't a reference of '{target_project.name}'")
target_project.references.remove(rel_path)
target_project.save()
Console.write_line(f"Removed '{reference_project.name}' from '{target_project.name}' project")

View File

@@ -1,41 +0,0 @@
import subprocess
import click
from cpl.cli.cli import cli
from cpl.cli.utils.pip import Pip
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("uninstall", aliases=["ui"])
@click.argument("package", required=False)
@click.argument("project", type=click.STRING, required=False)
@click.option("--dev", is_flag=True, help="Include dev dependencies")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def uninstall(package: str, project: str, dev: bool, verbose: bool):
if package is None:
package = Console.read("Package name to uninstall: ").strip()
project = Structure.get_project_by_name_or_path(project or "./")
deps = project.dependencies if not dev else project.dev_dependencies
try:
Pip.command(
"uninstall -y",
package,
verbose=verbose,
path=project.path,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to uninstall {package}: exit code {e.returncode}")
return
if package in deps:
del deps[package]
project.save()
Console.write_line(f"Removed {package} from project dependencies.")
return
Console.write_line(f"Package {package} was not found in project dependencies.")

View File

@@ -1,74 +0,0 @@
import subprocess
import click
from cpl.cli.cli import cli
from cpl.cli.const import PIP_URL
from cpl.cli.utils.pip import Pip
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("update", aliases=["u"])
@click.argument("package", type=click.STRING, required=False)
@click.argument("project", type=click.STRING, required=False)
@click.option("--dev", is_flag=True, help="Include dev dependencies")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def update(package: str, project: str, dev: bool, verbose: bool):
project = Structure.get_project_by_name_or_path(project or "./")
deps: dict = project.dependencies
if dev:
deps = project.dev_dependencies
if package is not None:
if package not in deps:
Console.error(f"Package '{package}' not installed.")
return
old_spec = deps[package]
Console.write_line(f"Updating {package} to '{project.name}':")
try:
Pip.command(
f"install --upgrade --extra-index-url {PIP_URL}" f"{Pip.normalize_dep(package, old_spec)}",
verbose=verbose,
path=project.path,
)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to install {package}: exit code {e.returncode}")
return
installed_version = Pip.get_package_version(package, path=project.path)
if installed_version is None:
Console.error(f"Package '{package}' not found after update.")
return
deps[package] = Pip.apply_prefix(installed_version, old_spec)
project.save()
Console.write_line(f"Updated {package} to {deps[package]}")
return
if not deps:
Console.error("No dependencies to install.")
return
Console.write_line(f"Updating dependencies for '{project.name}':")
for name, version in list(deps.items()):
dep = Pip.normalize_dep(name, version)
Console.write_line(f" -> {dep}")
try:
Pip.command("install --upgrade", dep, verbose=verbose, path=project.path)
except subprocess.CalledProcessError as e:
Console.error(f"Failed to update {dep}: exit code {e.returncode}")
return
installed_version = Pip.get_package_version(name, path=project.path)
if installed_version is None:
Console.error(f"Package '{name}' not found after update.")
continue
deps[name] = Pip.apply_prefix(installed_version, version)
project.save()

View File

@@ -1,73 +0,0 @@
import os.path
import subprocess
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.venv import ensure_venv, get_venv_python
from cpl.core.configuration import Configuration
from cpl.core.console import Console
@cli.command("build", aliases=["b"])
@click.argument("project", type=click.STRING, required=False)
@click.option("--dist", "-d", type=str)
@click.option("--skip-py-build", "-spb", is_flag=True, help="Skip toml generation and python build")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def build(project: str, dist: str = None, skip_py_build: bool = None, verbose: bool = None):
from cpl.cli.utils.structure import Structure
project = Structure.get_project_by_name_or_path(project or "./")
venv = ensure_venv().absolute()
dist_path = dist or Path(project.path).parent / "dist"
if dist is None and Configuration.get("workspace") is not None:
dist_path = Path(Configuration.get("workspace").path).parent / "dist"
dist_path = Path(dist_path).resolve().absolute()
if verbose:
Console.write_line(f"Creating dist folder at {dist_path}...")
os.makedirs(dist_path, exist_ok=True)
project.do_build(dist_path, verbose)
if skip_py_build:
Console.write_line("\nDone!")
return
Structure.create_pyproject_toml(project, dist_path / project.name)
python = str(get_venv_python(venv))
result = Console.spinner(
"Building python package...",
lambda: subprocess.run(
[
python,
"-m",
"build",
"--outdir",
str(dist_path / project.name),
str(dist_path / project.name),
],
check=True,
stdin=subprocess.DEVNULL if not verbose else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
),
)
if result is None:
raise RuntimeError("Build process did not run")
if verbose and result.stdout is not None:
Console.write_line(result.stdout.decode())
if result.returncode != 0 and result.stderr is not None:
if result.stderr is not None:
Console.error(str(result.stderr.decode()))
raise RuntimeError(f"Build process failed with exit code {result.returncode}")
Console.write_line(" Done!")

View File

@@ -1,55 +0,0 @@
import os
import subprocess
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.structure import Structure
from cpl.cli.utils.venv import get_venv_python, ensure_venv
from cpl.core.configuration import Configuration
from cpl.core.console import Console
@cli.command("run", aliases=["r"])
@click.argument("project", type=str, required=False, default=None)
@click.argument("args", nargs=-1)
@click.option("--dev", "-d", is_flag=True, help="Use sources instead of build output")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def run(project: str, args: list[str], dev: bool, verbose: bool):
project_path = Path("./")
if project is not None:
project_path = (Path("./") / project).resolve().absolute()
project = Structure.get_project_by_name_or_path(str(project_path))
if project.main is None:
Console.error(f"Project {project.name} has no executable")
return
path = str(Path(project.path).parent.resolve().absolute())
executable = project.main
if not dev:
dist_path = Path(project.path).parent / "dist"
if Configuration.get("workspace") is not None:
dist_path = Path(Configuration.get("workspace").path).parent / "dist"
dist_path = Path(dist_path).resolve().absolute()
if verbose:
Console.write_line(f"Creating dist folder at {dist_path}...")
os.makedirs(dist_path, exist_ok=True)
project.do_build(dist_path, verbose)
path = dist_path / project.name
main = project.main.replace(project.directory, "").lstrip("/\\")
executable = path / main
python = str(get_venv_python(ensure_venv()).absolute())
Console.write_line(f"\nStarting project {project.name}...")
if verbose:
Console.write_line(f" with args {args}...")
Console.write_line("\n\n")
subprocess.run([python, executable, *args], cwd=path)

View File

@@ -1,31 +0,0 @@
from pathlib import Path
import click
from cpl.cli.cli import cli
from cpl.cli.utils.live_server.live_server import LiveServer
from cpl.cli.utils.structure import Structure
from cpl.core.console import Console
@cli.command("start", aliases=["s"])
@click.argument("project", type=str, required=False, default=None)
@click.argument("args", nargs=-1)
@click.option("--dev", "-d", is_flag=True, help="Use sources instead of build output")
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def start(project: str, args: list[str], dev: bool, verbose: bool):
project_path = Path("./")
if project is not None:
project_path = (Path("./") / project).resolve().absolute()
project = Structure.get_project_by_name_or_path(str(project_path))
if project.main is None:
Console.error(f"Project {project.name} has no executable")
return
LiveServer(
project,
args,
dev,
verbose,
).start()

View File

@@ -1,104 +0,0 @@
import os
from pathlib import Path
import click
from cpl.cli import cli as clim
from cpl.cli.cli import cli
from cpl.cli.model.project import Project
from cpl.cli.model.workspace import Workspace
from cpl.cli.utils.structure import Structure
from cpl.cli.utils.template_collector import TemplateCollector
from cpl.cli.utils.template_renderer import TemplateRenderer
from cpl.core.configuration import Configuration
from cpl.core.console import Console
@cli.command("generate", aliases=["g"])
@click.argument("schematic", type=click.STRING, required=True)
@click.argument("name", type=click.STRING, required=True)
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def generate(schematic: str, name: str, verbose: bool) -> None:
TemplateCollector.collect_templates(Path(clim.__file__).parent, verbose)
workspace: Workspace = Configuration.get("workspace")
if workspace is not None:
if verbose:
Console.write_line("Workspace found, collecting templates...")
TemplateCollector.collect_templates(Path(workspace.path).parent, verbose)
project = None
try:
project = Structure.get_project_by_name_or_path("./")
if verbose:
Console.write_line("project found, collecting templates...")
TemplateCollector.collect_templates(Path(project.path).parent, verbose)
except ValueError:
if verbose:
Console.write_line("Local project not found")
templates = TemplateCollector.get_templates()
schematics = {}
for template_name, template_content in templates.items():
t_name = template_name.split(".")[0]
if t_name in schematics:
raise ValueError(f"Duplicate schematic name found: {t_name}")
schematics[t_name] = template_name
for i in range(len(t_name)):
char = t_name[i]
if char in schematics:
continue
schematics[char] = template_name
break
if schematic not in schematics:
raise ValueError(
f"Schematic '{schematic}' not found. Available schematics: {', '.join([x.split(".")[0] for x in templates.keys()])}"
)
path, name = _get_name_and_path_from_name(name, project)
os.makedirs(path, exist_ok=True)
Console.write_line(f"Generating {str(path / name)} ...")
with open(path / f"{name}.py", "w") as f:
f.write(
TemplateRenderer.render_template(
schematics[schematic].split(".")[0], templates[schematics[schematic]], name, str(path)
)
)
def _get_name_and_path_from_name(in_name: str, project: Project = None) -> tuple[Path, str]:
path = ""
name = ""
in_name_parts = in_name.split("/")
if len(in_name_parts) == 1:
name = in_name_parts[0]
else:
path = "/".join(in_name_parts[:-1])
name = in_name_parts[-1]
workspace: Workspace = Configuration.get("workspace")
if workspace is None and project is not None:
return (Path(project.path).parent / project.directory / path).resolve().absolute(), name
elif workspace is None and project is None:
return Path(path).resolve().absolute(), name
selected_project = project
project_name = path.split("/")[0]
project_by_name = workspace.get_project_by_name(project_name)
if project_by_name is not None:
selected_project = project_by_name
path = "/".join(path.split("/")[1:])
if selected_project is None:
selected_project = workspace.get_project_by_name(workspace.default_project)
return (Path(selected_project.path).parent / selected_project.directory / path).resolve().absolute(), name

View File

@@ -1,42 +0,0 @@
from pathlib import Path
import click
from cpl.cli.const import PROJECT_TYPES, PROJECT_TYPES_SHORT
from cpl.cli.utils.prompt import ProjectType
from cpl.cli.utils.structure import Structure
from cpl.cli.utils.venv import ensure_venv
from cpl.core.console import Console
@click.command("init")
@click.argument("target", required=False)
@click.argument("name", required=False)
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def init(target: str, name: str, verbose: bool = False):
workspace = None
project = None
if target is None:
Console.write_line("CPL Init Wizard")
target = click.prompt(
"What do you want to initialize?",
type=ProjectType,
show_choices=True,
)
if target in PROJECT_TYPES_SHORT:
target = [pt for pt in PROJECT_TYPES if pt.startswith(target)][0]
if target in ["workspace", "ws"]:
workspace = Structure.init_workspace("./", name or click.prompt("Workspace name", default="my-workspace"))
elif target in PROJECT_TYPES:
workspace = Structure.find_workspace_in_path(Path(name or "./").parent)
project = Structure.init_project(
"./", name or click.prompt("Project name", default=f"my-{target}"), target, workspace, verbose=verbose
)
else:
Console.error(f"Unknown target '{target}'")
raise SystemExit(1)
ensure_venv(Path((workspace or project).path).parent)

View File

@@ -1,70 +0,0 @@
import os
from pathlib import Path
import click
from cpl.cli import cli as clim
from cpl.cli.cli import cli
from cpl.cli.const import PROJECT_TYPES, PROJECT_TYPES_SHORT
from cpl.cli.model.workspace import Workspace
from cpl.cli.utils.prompt import ProjectType
from cpl.cli.utils.structure import Structure
from cpl.cli.utils.venv import ensure_venv
from cpl.core.console import Console
@cli.command("new", aliases=["n"])
@click.argument("type", type=ProjectType, required=True)
@click.argument("name", type=click.STRING, required=True)
@click.option("--name", "in_name", type=click.STRING, help="Name of the workspace or project to create.")
@click.option(
"--project",
"-p",
nargs=2,
metavar="<type> <name>",
help="Optional: when creating a workspace, also create a project with the given name and type.",
)
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
def new(type: str, name: str, in_name: str | None, project: list[str] | None, verbose: bool) -> None:
path = Path(name).parent
project_name = in_name or Path(name).stem
if type in ["workspace", "ws"]:
Structure.init_workspace(name, project_name)
workspace = Workspace.from_file(Path(name) / "cpl.workspace.json")
ensure_venv(Path(name))
if project is None or len(project) != 2:
return
type = project[0]
if type not in PROJECT_TYPES + PROJECT_TYPES_SHORT:
raise ValueError(f"Unknown project type '{type}'")
path = Path(workspace.path).parent / Path(project[1]).parent
project_name = Path(project[1]).stem
workspace = Structure.find_workspace_in_path(path)
if workspace is None:
Console.error("No workspace found. Please run 'cpl init workspace' first.")
raise SystemExit(1)
if project_name in workspace.project_names:
Console.error(f"Project '{project_name}' already exists in the workspace")
raise SystemExit(1)
if verbose:
Console.write_line(f"Creating project '{path/project_name}'...")
project_types = os.listdir(Path(clim.__file__).parent / ".cpl" / "new")
project_types.extend(set(x[0] for x in PROJECT_TYPES))
if type not in project_types:
raise ValueError(f"Unsupported project type '{type}'")
Structure.create_project(path, type, project_name, workspace, verbose)
ensure_venv(Path((workspace or project).path).parent)
if workspace.default_project is None:
workspace.default_project = project_name
workspace.save()

View File

@@ -1,27 +0,0 @@
import platform
import cpl
from cpl.cli.cli import cli
from cpl.cli.utils.pip import Pip
from cpl.core.console import Console, ForegroundColorEnum
@cli.command("version", aliases=["v"])
def version():
Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.banner("CPL CLI")
Console.set_foreground_color(ForegroundColorEnum.default)
Console.write_line()
Console.write_line(f"CPL CLI: {getattr(cpl.cli, '__version__', "1.0")}")
Console.write_line(f"Python: {platform.python_version()}")
Console.write_line(f"PIP: {Pip.get_pip_version()}")
Console.write_line(f"OS: {platform.system()} {platform.release()}")
Console.write_line("\nCPL Packages:\n")
cpl_packages = {n: v for n, v in Pip.get_packages().items() if n.startswith("cpl-")}
if len(cpl_packages) == 0:
Console.write_line("No CPL packages installed")
return
Console.table(["Package", "Version"], [[n, v] for n, v in cpl_packages.items()])

View File

@@ -1,4 +0,0 @@
PROJECT_TYPES = ["console", "web", "graphql", "library", "service"]
PROJECT_TYPES_SHORT = [x[0] for x in PROJECT_TYPES]
PIP_URL = "https://git.sh-edraft.de/api/packages/sh-edraft.de/pypi/simple/"

View File

@@ -1,95 +0,0 @@
from pathlib import Path
from cpl.cli.cli import cli
from cpl.cli.command.package.add import add
from cpl.cli.command.package.install import install
from cpl.cli.command.package.remove import remove
from cpl.cli.command.package.uninstall import uninstall
from cpl.cli.command.package.update import update
from cpl.cli.command.project.build import build
from cpl.cli.command.project.run import run
from cpl.cli.command.project.start import start
from cpl.cli.command.structure.generate import generate
from cpl.cli.command.structure.init import init
from cpl.cli.command.structure.new import new
from cpl.cli.command.version import version
from cpl.cli.model.workspace import Workspace
from cpl.cli.utils.custom_command import script_command
from cpl.core.configuration import Configuration
from cpl.core.console import Console
def _load_workspace(path: str) -> Workspace | None:
path = Path(path)
if not path.exists() or path.is_dir():
return None
return Workspace.from_file(path)
def _load_scripts():
for p in [
"./cpl.workspace.json",
"../cpl.workspace.json",
"../../cpl.workspace.json",
]:
ws = _load_workspace(p)
if ws is None:
continue
Configuration.set("workspace", ws)
return ws.scripts
return {}
def prepare():
scripts = _load_scripts()
for name, command in scripts.items():
script_command(cli, name, command)
def configure():
# cli
cli.add_command(version)
# structure
cli.add_command(init)
cli.add_command(new)
cli.add_command(generate)
# packaging
cli.add_command(install)
cli.add_command(uninstall)
cli.add_command(update)
cli.add_command(add)
cli.add_command(remove)
# run
cli.add_command(build)
cli.add_command(run)
cli.add_command(start)
def main():
prepare()
configure()
try:
cli()
finally:
Console.write_line()
if __name__ == "__main__":
main()
# ((
# ( `)
# ; / ,
# / \/
# / |
# / ~/
# / ) ) ~ edraft
# ___// | /
# `--' \_~-,

View File

@@ -1,22 +0,0 @@
from cpl.cli.model.cpl_sub_structure_model import CPLSubStructureModel
class Build(CPLSubStructureModel):
@staticmethod
def new(include: list[str], exclude: list[str]) -> "Build":
return Build(include, exclude)
def __init__(self, include: list[str], exclude: list[str]):
CPLSubStructureModel.__init__(self)
self._include = include
self._exclude = exclude
@property
def include(self) -> list[str]:
return self._include
@property
def exclude(self) -> list[str]:
return self._exclude

View File

@@ -1,150 +0,0 @@
import inspect
import json
import os
from inspect import isclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, TypeVar
from cpl.cli.model.cpl_sub_structure_model import CPLSubStructureModel
T = TypeVar("T", bound="CPLStructureModel")
class CPLStructureModel:
def __init__(self, path: Optional[str] = None, ignore_fields: Optional[List[str]] = None):
self._path = path
self._ignore = {"_ignore", "_path"}
if ignore_fields is not None:
self._ignore.update(ignore_fields)
@property
def path(self) -> Optional[str]:
return self._path
@classmethod
def from_file(cls: Type[T], path: Path | str) -> T:
if isinstance(path, str):
path = Path(path)
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return cls.from_json(data, path=path)
@classmethod
def from_json(cls: Type[T], data: Dict[str, Any], path: Optional[Path | str] = None) -> T:
if isinstance(path, str):
path = Path(path)
sig = inspect.signature(cls.__init__)
kwargs: Dict[str, Any] = {}
for name, param in list(sig.parameters.items())[1:]:
if name == "path":
kwargs[name] = str(path)
continue
if isclass(param.annotation) and issubclass(param.annotation, CPLSubStructureModel):
kwargs[name] = param.annotation.from_json(data[name])
continue
if name in data:
kwargs[name] = data[name]
continue
priv = "_" + name
if priv in data:
kwargs[name] = data[priv]
continue
camel = _self_or_cls_snake_to_camel(name)
if camel in data:
kwargs[name] = data[camel]
continue
if param.default is not inspect._empty:
kwargs[name] = param.default
continue
raise KeyError(f"Missing required field '{name}' for {cls.__name__}.")
return cls(**kwargs)
def to_json(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for key, value in self.__dict__.items():
if not key.startswith("_") or key in self._ignore:
continue
out_key = _self_or_cls_snake_to_camel(key[1:])
if isinstance(value, CPLSubStructureModel):
value = value.to_json()
result[out_key] = value
return result
def save(self):
if not self._path:
raise ValueError("Cannot save model without a path.")
if not Path(self._path).exists():
os.makedirs(Path(self._path).parent, exist_ok=True)
with open(self._path, "w", encoding="utf-8") as f:
json.dump(self.to_json(), f, indent=2)
@staticmethod
def _require_str(value, field: str, allow_empty: bool = True) -> str:
if not isinstance(value, str):
raise TypeError(f"{field} must be of type str")
if not allow_empty and not value.strip():
raise ValueError(f"{field} must not be empty")
return value
@staticmethod
def _require_optional_non_empty_str(value, field: str) -> Optional[str]:
if value is None:
return None
if not isinstance(value, str):
raise TypeError(f"{field} must be str or None")
s = value.strip()
if not s:
raise ValueError(f"{field} must not be empty when set")
return s
@staticmethod
def _require_list_of_str(value, field: str) -> List[str]:
if not isinstance(value, list):
raise TypeError(f"{field} must be a list")
out: List[str] = []
for i, v in enumerate(value):
if not isinstance(v, str):
raise TypeError(f"{field}[{i}] must be of type str")
s = v.strip()
if s:
out.append(s)
seen = set()
uniq: List[str] = []
for s in out:
if s not in seen:
seen.add(s)
uniq.append(s)
return uniq
@staticmethod
def _require_dict_str_str(value, field: str) -> Dict[str, str]:
if not isinstance(value, dict):
raise TypeError(f"{field} must be a dict")
out: Dict[str, str] = {}
for k, v in value.items():
if not isinstance(k, str) or not k.strip():
raise TypeError(f"Keys in {field} must be non-empty strings")
if not isinstance(v, str) or not v.strip():
raise TypeError(f"Values in {field} must be non-empty strings")
out[k.strip()] = v.strip()
return out
def _self_or_cls_snake_to_camel(s: str) -> str:
parts = s.split("_")
return parts[0] + "".join(p[:1].upper() + p[1:] for p in parts[1:])

View File

@@ -1,104 +0,0 @@
import inspect
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, TypeVar
T = TypeVar("T", bound="CPLSubStructureModel")
class CPLSubStructureModel:
def __init__(self, path: Optional[str] = None):
self._path = path
@classmethod
def from_json(cls: Type[T], data: Dict[str, Any]) -> T:
sig = inspect.signature(cls.__init__)
kwargs: Dict[str, Any] = {}
for name, param in list(sig.parameters.items())[1:]:
if name in data:
kwargs[name] = data[name]
continue
priv = "_" + name
if priv in data:
kwargs[name] = data[priv]
continue
camel = _self_or_cls_snake_to_camel(name)
if camel in data:
kwargs[name] = data[camel]
continue
if param.default is not inspect._empty:
kwargs[name] = param.default
continue
raise KeyError(f"Missing required field '{name}' for {cls.__name__}.")
return cls(**kwargs)
def to_json(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for key, value in self.__dict__.items():
if not key.startswith("_") or key == "_path":
continue
out_key = _self_or_cls_snake_to_camel(key[1:])
result[out_key] = value
return result
@staticmethod
def _require_str(value, field: str, allow_empty: bool = True) -> str:
if not isinstance(value, str):
raise TypeError(f"{field} must be of type str")
if not allow_empty and not value.strip():
raise ValueError(f"{field} must not be empty")
return value
@staticmethod
def _require_optional_non_empty_str(value, field: str) -> Optional[str]:
if value is None:
return None
if not isinstance(value, str):
raise TypeError(f"{field} must be str or None")
s = value.strip()
if not s:
raise ValueError(f"{field} must not be empty when set")
return s
@staticmethod
def _require_list_of_str(value, field: str) -> List[str]:
if not isinstance(value, list):
raise TypeError(f"{field} must be a list")
out: List[str] = []
for i, v in enumerate(value):
if not isinstance(v, str):
raise TypeError(f"{field}[{i}] must be of type str")
s = v.strip()
if s:
out.append(s)
seen = set()
uniq: List[str] = []
for s in out:
if s not in seen:
seen.add(s)
uniq.append(s)
return uniq
@staticmethod
def _require_dict_str_str(value, field: str) -> Dict[str, str]:
if not isinstance(value, dict):
raise TypeError(f"{field} must be a dict")
out: Dict[str, str] = {}
for k, v in value.items():
if not isinstance(k, str) or not k.strip():
raise TypeError(f"Keys in {field} must be non-empty strings")
if not isinstance(v, str) or not v.strip():
raise TypeError(f"Values in {field} must be non-empty strings")
out[k.strip()] = v.strip()
return out
def _self_or_cls_snake_to_camel(s: str) -> str:
parts = s.split("_")
return parts[0] + "".join(p[:1].upper() + p[1:] for p in parts[1:])

View File

@@ -1,283 +0,0 @@
import fnmatch
import os
import re
import shutil
from pathlib import Path
from typing import Optional, List, Dict, Self
from urllib.parse import urlparse
from cpl.cli.model.build import Build
from cpl.cli.model.cpl_structure_model import CPLStructureModel
from cpl.core.console import Console
class Project(CPLStructureModel):
_ALLOWED_TYPES = {"application", "library"}
_SEMVER_RE = re.compile(r"^\d+\.\d+\.\d+(?:[-+][0-9A-Za-z.-]+)?$")
@staticmethod
def new(path: str, name: str, project_type: str) -> "Project":
return Project(
path,
name,
"0.1.0",
project_type,
"",
"",
"",
"",
[],
{},
{},
[],
None,
"./",
Build.new([], []),
)
def __init__(
self,
path: str,
name: str,
version: str,
type: str,
license: str,
author: str,
description: str,
homepage: str,
keywords: List[str],
dependencies: Dict[str, str],
dev_dependencies: Dict[str, str],
references: List[str],
main: Optional[str],
directory: str,
build: Build,
):
CPLStructureModel.__init__(self, path)
self._name = name
self._version = version
self._type = type
self._license = license
self._author = author
self._description = description
self._homepage = homepage
self._keywords = keywords
self._dependencies = dependencies
self._dev_dependencies = dev_dependencies
self._references = references
self._main = main
self._directory = directory
self._build = build
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = self._require_str(value, "name", allow_empty=False).strip()
@property
def version(self) -> str:
return self._version
@version.setter
def version(self, value: str):
value = self._require_str(value, "version", allow_empty=False).strip()
if not self._SEMVER_RE.match(value):
raise ValueError("version must follow SemVer X.Y.Z (optionally with -/+)")
self._version = value
@property
def type(self) -> str:
return self._type
@type.setter
def type(self, value: str):
value = self._require_str(value, "type", allow_empty=False).strip()
if value not in self._ALLOWED_TYPES:
allowed = ", ".join(sorted(self._ALLOWED_TYPES))
raise ValueError(f"type must be one of: {allowed}")
self._type = value
@property
def license(self) -> str:
return self._license
@license.setter
def license(self, value: str):
self._license = self._require_str(value, "license", allow_empty=True).strip()
@property
def author(self) -> str:
return self._author
@author.setter
def author(self, value: str):
self._author = self._require_str(value, "author", allow_empty=True).strip()
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, value: str):
self._description = self._require_str(value, "description", allow_empty=True).strip()
@property
def homepage(self) -> str:
return self._homepage
@homepage.setter
def homepage(self, value: str):
value = self._require_str(value, "homepage", allow_empty=True).strip()
if value:
parsed = urlparse(value)
if parsed.scheme not in ("http", "https") or not parsed.netloc:
raise ValueError("homepage must be a valid http/https URL")
self._homepage = value
@property
def keywords(self) -> List[str]:
return self._keywords
@keywords.setter
def keywords(self, value: List[str]):
self._keywords = self._require_list_of_str(value, "keywords")
@property
def dependencies(self) -> Dict[str, str]:
return self._dependencies
@dependencies.setter
def dependencies(self, value: Dict[str, str]):
self._dependencies = self._require_dict_str_str(value, "dependencies")
@property
def dev_dependencies(self) -> Dict[str, str]:
return self._dev_dependencies
@dev_dependencies.setter
def dev_dependencies(self, value: Dict[str, str]):
self._dev_dependencies = self._require_dict_str_str(value, "devDependencies")
@property
def references(self) -> List[str]:
return self._references
@references.setter
def references(self, value: List[str]):
self._references = self._require_list_of_str(value, "references")
@property
def main(self) -> Optional[str]:
return self._main
@main.setter
def main(self, value: Optional[str]):
self._main = self._require_optional_non_empty_str(value, "main")
@property
def directory(self) -> str:
return self._directory
@directory.setter
def directory(self, value: str):
self._directory = self._require_str(value, "directory", allow_empty=False).strip()
@property
def build(self) -> Build:
return self._build
def _collect_files(self, rel_dir: Path) -> List[Path]:
files: List[Path] = []
exclude_patterns = [p.strip() for p in self._build.exclude or []]
exclude_patterns.append("cpl.*.json")
for root, dirnames, filenames in os.walk(rel_dir, topdown=True):
root_path = Path(root)
rel_root = root_path.relative_to(rel_dir).as_posix()
dirnames[:] = [
d
for d in dirnames
if not any(
fnmatch.fnmatch(f"{rel_root}/{d}" if rel_root else d, pattern) or fnmatch.fnmatch(d, pattern)
for pattern in exclude_patterns
)
]
for filename in filenames:
rel_path = f"{rel_root}/{filename}" if rel_root else filename
if any(
fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch(filename, pattern)
for pattern in exclude_patterns
):
continue
files.append(root_path / filename)
return files
def build_references(self, dist: Path, verbose: bool = False):
references = []
old_dir = os.getcwd()
os.chdir(Path(self.path).parent)
for ref in self.references:
os.chdir(Path(ref).parent)
references.append(Project.from_file(ref))
for p in references:
os.chdir(Path(p.path).parent)
p.do_build(dist, verbose, self)
os.chdir(old_dir)
def do_build(self, dist: Path, verbose: bool = False, parent: Self = None, silent: bool = False):
if isinstance(dist, str):
dist = Path(dist)
dist_project = self if parent is None else parent
dist_path = (dist / dist_project.name / self.directory).resolve().absolute()
if parent is None:
if verbose:
Console.write_line(f" Cleaning dist folder at {dist_path}...")
shutil.rmtree(str(dist_path), ignore_errors=True)
if verbose:
Console.write_line(f" Building references for project {self.name}...")
self.build_references(dist, verbose)
def _build():
if verbose:
Console.write_line(f" Collecting project '{self.name}' files...")
rel_dir = (Path(self.path).parent / Path(self.directory)).absolute()
files = self._collect_files(rel_dir)
if len(files) == 0:
if verbose:
Console.write_line(f" No files found in {rel_dir}, skipping copy.")
return
for file in files:
rel_path = file.relative_to(rel_dir)
dest_file_path = dist_path / rel_path
if not dest_file_path.parent.exists():
os.makedirs(dest_file_path.parent, exist_ok=True)
shutil.copy(file, dest_file_path)
if verbose:
Console.write_line(f" Copied {len(files)} files from {rel_dir} to {dist_path}")
if not silent:
Console.write_line(" Done!")
if silent:
_build()
return
Console.spinner(f"Building project {self.name}...", lambda: _build())

View File

@@ -1,92 +0,0 @@
from pathlib import Path
from typing import Optional, List, Dict
from cpl.cli.model.cpl_structure_model import CPLStructureModel
from cpl.cli.model.project import Project
class Workspace(CPLStructureModel):
@staticmethod
def new(path: str, name: str) -> "Workspace":
return Workspace(
path=path,
name=name,
projects=[],
default_project=None,
scripts={},
)
def __init__(
self,
path: str,
name: str,
projects: List[str],
default_project: Optional[str],
scripts: Dict[str, str],
):
CPLStructureModel.__init__(self, path, ["_actual_projects", "_project_names"])
self._name = name
self._projects = projects
self._default_project = default_project
self._actual_projects = []
self._project_names = []
for project in projects:
if Path(project).is_dir() or not Path(project).exists():
raise ValueError(f"Project path '{project}' does not exist or is a directory.")
p = Project.from_file(project)
self._actual_projects.append(p)
self._project_names.append(p.name)
if default_project is not None and default_project not in self._project_names:
raise ValueError(f"Default project '{default_project}' not found in workspace projects.")
self._scripts = scripts
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = self._require_str(value, "name", allow_empty=False).strip()
@property
def projects(self) -> List[str]:
return self._projects
@projects.setter
def projects(self, value: List[str]):
self._projects = self._require_list_of_str(value, "projects")
@property
def actual_projects(self) -> List[Project]:
return self._actual_projects
@property
def project_names(self) -> List[str]:
return self._project_names
@property
def default_project(self) -> Optional[str]:
return self._default_project
@default_project.setter
def default_project(self, value: Optional[str]):
self._default_project = self._require_optional_non_empty_str(value, "defaultProject")
@property
def scripts(self) -> Dict[str, str]:
return self._scripts
@scripts.setter
def scripts(self, value: Dict[str, str]):
self._scripts = self._require_dict_str_str(value, "scripts")
def get_project_by_name(self, name: str) -> Optional[Project]:
for project in self.actual_projects:
if project.name == name:
return project
return None

View File

@@ -1,18 +0,0 @@
import subprocess
import click
def script_command(cli_group, name, command):
@cli_group.command(name)
@click.argument("args", nargs=-1)
def _run_script(args):
click.echo(f"Running script: {name}")
try:
cmd = command.split() + list(args)
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
click.echo(f"Script '{name}' failed with exit code {e.returncode}", err=True)
except FileNotFoundError:
click.echo(f"Command not found: {command.split()[0]}", err=True)

View File

@@ -1,14 +0,0 @@
import json
from pathlib import Path
def load_project_json(path: Path) -> dict:
if not path.exists():
return {}
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def save_project_json(path: Path, data: dict):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)

View File

@@ -1,51 +0,0 @@
import re
import time
from watchdog.events import FileSystemEventHandler, FileModifiedEvent, FileClosedEvent
class FileEventHandler(FileSystemEventHandler):
_ignore_patterns = [
r".*~$",
r".*\.swp$",
r".*\.swx$",
r".*\.tmp$",
r".*__pycache__.*",
r".*\.pytest_cache.*",
r".*/\.idea/.*",
r".*/\.vscode/.*",
r".*\.DS_Store$",
r"#.*#$",
]
_watch_extensions = (".py", ".json", ".yaml", ".yml", ".toml")
def __init__(self, on_save, debounce_seconds: float = 0.5):
super().__init__()
self._on_save = on_save
self._debounce = debounce_seconds
self._last_triggered = 0
self._last_file = None
def _should_ignore(self, path: str) -> bool:
for pattern in self._ignore_patterns:
if re.match(pattern, path):
return True
return not path.endswith(self._watch_extensions)
def _debounced_trigger(self, path: str):
now = time.time()
if path != self._last_file or now - self._last_triggered > self._debounce:
self._last_triggered = now
self._last_file = path
self._on_save(path)
def on_modified(self, event):
if not event.is_directory and isinstance(event, FileModifiedEvent):
if not self._should_ignore(event.src_path):
self._debounced_trigger(event.src_path)
def on_closed(self, event):
if not event.is_directory and isinstance(event, FileClosedEvent):
if not self._should_ignore(event.src_path):
self._debounced_trigger(event.src_path)

View File

@@ -1,103 +0,0 @@
import os
import subprocess
import time
from pathlib import Path
from watchdog.observers import Observer
from cpl.cli.model.project import Project
from cpl.cli.utils.live_server.file_event_handler import FileEventHandler
from cpl.cli.utils.venv import get_venv_python, ensure_venv
from cpl.core.configuration import Configuration
from cpl.core.console import Console
class LiveServer:
def __init__(self, project: Project, args: list[str], dev: bool = False, verbose: bool = False):
path = str(Path(project.path).parent.resolve().absolute())
executable = (Path(project.path).parent / Path(project.directory) / project.main).resolve().absolute()
self._dist_path = None
if not dev:
self._dist_path = Path(project.path).parent / "dist"
if Configuration.get("workspace") is not None:
self._dist_path = Path(Configuration.get("workspace").path).parent / "dist"
self._dist_path = Path(self._dist_path).resolve().absolute()
if verbose:
Console.write_line(f"Creating dist folder at {self._dist_path}...")
os.makedirs(self._dist_path, exist_ok=True)
project.do_build(self._dist_path, verbose)
path = self._dist_path / project.name / project.directory
main = project.main.replace(project.directory, "").lstrip("/\\")
executable = (path / main).resolve().absolute()
if not os.path.isfile(executable):
Console.error(f"Executable {executable} not found.")
return
self._project = project
self._sources = (Path(project.path).parent / Path(project.directory)).resolve().absolute()
self._executable = executable
self._working_directory = Path(path)
self._args = args
self._is_dev = dev
self._verbose = verbose
self._process = None
self._observer = None
self._python = str(get_venv_python(ensure_venv(Path("./"))).absolute())
def _run_executable(self):
if self._process:
self._process.terminate()
self._process.wait()
self._process = subprocess.Popen(
[self._python, str(self._executable), *self._args],
cwd=self._working_directory,
)
def _on_change(self, changed_file: str):
if self._verbose:
Console.write_line(f"Change detected: {changed_file}")
if self._is_dev and self._process:
self._process.terminate()
self._process.wait()
Console.write_line("Restart\n\n")
time.sleep(0.5) # debounce to avoid copy temp files
if not self._is_dev:
self._project.do_build(self._dist_path, verbose=self._verbose, silent=True)
self._run_executable()
def start(self):
handler = FileEventHandler(self._on_change)
observer = Observer()
observer.schedule(handler, str(self._sources), recursive=True)
observer.start()
self._observer = observer
Console.write_line("** CPL live development server is running **\n")
Console.write_line(f"Watching {self._sources} ... (Ctrl+C to stop)")
Console.write_line(f"Starting {self._executable} ...\n\n")
self._run_executable()
try:
while True:
time.sleep(1)
except KeyboardInterrupt as e:
time.sleep(1)
Console.write_line("Stopping...")
finally:
if self._process:
self._process.terminate()
self._process.wait()
observer.stop()
observer.join()
Console.close()

View File

@@ -1,13 +0,0 @@
class NameUtils:
@staticmethod
def classify(name: str) -> str: # UserService
return "".join(w.capitalize() for w in name.replace("-", "_").split("_"))
@staticmethod
def dasherize(name: str) -> str: # user-service
return name.replace("_", "-").lower()
@staticmethod
def camelize(name: str) -> str: # userService
parts = name.split("-")
return parts[0] + "".join(w.capitalize() for w in parts[1:])

View File

@@ -1,177 +0,0 @@
import os.path
import re
import subprocess
from pathlib import Path
from cpl.cli.utils.venv import ensure_venv, get_venv_pip
from cpl.core.console import Console
class Pip:
_ANY_PREFIX_RE = re.compile(r"(===|~=|==|!=|>=|<=|>>|<<|>|<|!|~|=|\^)")
@staticmethod
def normalize_dep(name: str, raw: str) -> str:
raw = raw.strip()
if not raw:
return name
table = {
"!": "!=",
">": ">=",
">>": ">",
"<": "<=",
"<<": "<",
"~": "~=",
"=": "===",
}
op = "=="
for prefix, pip_op in table.items():
if raw.startswith(prefix):
op = pip_op
raw = raw[len(prefix) :]
break
return f"{name}{op}{raw}"
@classmethod
def apply_prefix(cls, installed: str, spec: str = None) -> str:
if spec is None or not spec.strip():
return f"~{installed}"
s = spec.strip()
if "," in s:
return s
m = cls._ANY_PREFIX_RE.search(s)
if not m:
return f"~{installed}"
op = m.group(1)
rest = s[m.end() :].strip()
if "," in rest:
rest = rest.split(",", 1)[0].strip()
if " " in rest:
rest = rest.split()[0]
orig_version = rest
installed_parts = [p for p in installed.split(".") if p != ""]
if orig_version:
orig_parts = [p for p in orig_version.split(".") if p != ""]
trimmed_installed = ".".join(installed_parts[: len(orig_parts)]) or installed
else:
trimmed_installed = installed
pip_to_cpl = {
"==": "",
"!=": "!",
">=": ">",
">": ">>",
"<=": "<",
"<": "<<",
"~=": "~",
"===": "=",
"^": "~",
}
if op in pip_to_cpl:
cpl_op = pip_to_cpl[op]
else:
cpl_op = op
return f"{cpl_op}{trimmed_installed}"
@classmethod
def get_package_without_version(cls, spec: str) -> str:
for sep in ["==", ">=", "<=", ">", "<", "~=", "!="]:
if sep in spec:
return spec.split(sep, 1)[0].strip() or None
return spec.strip() or None
@classmethod
def get_package_full_version(cls, spec: str) -> str | None:
package_name = cls.get_package_without_version(spec)
return spec.replace(package_name, "").strip() or None
@staticmethod
def command(command: str, *args, verbose: bool = False, path: Path = None):
if path is not None and path.is_file():
path = os.path.dirname(path)
venv_path = ensure_venv(Path(os.getcwd()) / Path(path or "./"))
pip = get_venv_pip(venv_path)
if verbose:
Console.write_line()
Console.write_line(f"Running: {pip} {command} {''.join(args)}")
subprocess.run(
[*pip.split(), *command.split(), *args],
check=True,
cwd=path,
stdin=subprocess.DEVNULL if not verbose else None,
stdout=subprocess.DEVNULL if not verbose else None,
stderr=subprocess.DEVNULL if not verbose else None,
)
@staticmethod
def get_package_version(package: str, path: str = None) -> str | None:
venv_path = ensure_venv(Path(path or "./"))
pip = get_venv_pip(venv_path)
try:
result = subprocess.run(
[*pip.split(), "show", package],
check=True,
capture_output=True,
text=True,
stdin=subprocess.DEVNULL,
)
for line in result.stdout.splitlines():
if line.startswith("Version:"):
return line.split(":", 1)[1].strip()
except subprocess.CalledProcessError:
return None
return None
@staticmethod
def get_packages(path: str = None):
venv_path = ensure_venv(Path(path or "./"))
pip = get_venv_pip(venv_path)
try:
result = subprocess.run(
[*pip.split(), "list", "--format=freeze"],
check=True,
capture_output=True,
text=True,
stdin=subprocess.DEVNULL,
)
packages = {}
for line in result.stdout.splitlines():
if "==" in line:
name, version = line.split("==", 1)
packages[name] = version
return packages
except subprocess.CalledProcessError:
return {}
@staticmethod
def get_pip_version(path: str = None) -> str | None:
venv_path = ensure_venv(Path(path or "./"))
pip = get_venv_pip(venv_path)
try:
result = subprocess.run(
[*pip.split(), "--version"],
check=True,
capture_output=True,
text=True,
stdin=subprocess.DEVNULL,
)
version = result.stdout.split()[1]
return version
except subprocess.CalledProcessError:
return None

View File

@@ -1,28 +0,0 @@
import click
from cpl.cli.const import PROJECT_TYPES
class SmartChoice(click.Choice):
def __init__(self, choices: list, aliases: dict = None):
click.Choice.__init__(self, choices, case_sensitive=False)
self._aliases = {c: c[0].lower() for c in choices if len(c) > 0}
if aliases:
self._aliases.update({k: v.lower() for k, v in aliases.items()})
if any([x[0].lower in self._aliases for x in choices if len(x) > 1]):
raise ValueError("Alias conflict with first letters of choices")
def convert(self, value, param, ctx):
val_lower = value.lower()
if val_lower in self._aliases.values():
value = [k for k, v in self._aliases.items() if v == val_lower][0]
return super().convert(value, param, ctx)
def get_metavar(self, param, ctx):
return "|".join([f"({a}){option}" for option, a in self._aliases.items()])
ProjectType = SmartChoice(["workspace"] + PROJECT_TYPES, {"workspace": "ws"})

Some files were not shown because too many files have changed in this diff Show More