Compare commits

..

3 Commits

Author SHA1 Message Date
2bb264b8c1 Added hosted services #186
Some checks failed
Test before pr merge / test-lint (pull_request) Failing after 6s
Build on push / prepare (push) Successful in 9s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 18s
Build on push / dependency (push) Successful in 17s
Build on push / api (push) Has been cancelled
Build on push / application (push) Has started running
Build on push / auth (push) Has been cancelled
Build on push / database (push) Has started running
Build on push / mail (push) Has been cancelled
Build on push / translation (push) Has been cancelled
2025-09-25 00:53:17 +02:00
75417966eb Moved general example 2025-09-25 00:15:26 +02:00
15d3c59f02 StartupTask #186 2025-09-25 00:11:26 +02:00
31 changed files with 169 additions and 44 deletions

View File

@@ -9,7 +9,7 @@ from cpl.core.configuration import Configuration
from cpl.core.console import Console
from cpl.core.environment import Environment
from cpl.core.utils.cache import Cache
from custom.api.src.scoped_service import ScopedService
from scoped_service import ScopedService
from service import PingService

View File

@@ -7,7 +7,7 @@ from cpl.api import APILogger
from cpl.api.router import Router
from cpl.core.console import Console
from cpl.dependency import ServiceProvider
from custom.api.src.scoped_service import ScopedService
from scoped_service import ScopedService
@Router.authenticate()

View File

@@ -1,16 +1,15 @@
import time
from typing import Optional
from cpl.application.abc import ApplicationABC
from cpl.core.configuration import Configuration
from cpl.core.console import Console
from cpl.dependency import ServiceProvider
from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.core.pipes import IPAddressPipe
from cpl.dependency import ServiceProvider
from cpl.mail import EMail, EMailClientABC
from cpl.query import List
from general.scoped_service import ScopedService
from scoped_service import ScopedService
from test_service import TestService
from test_settings import TestSettings
@@ -74,3 +73,9 @@ class Application(ApplicationABC):
test_settings1 = Configuration.get(TestSettings)
Console.write_line(test_settings1.value)
# self.test_send_mail()
x = 0
while x < 5:
Console.write_line("Running...")
x += 1
time.sleep(5)

View File

@@ -0,0 +1,20 @@
import asyncio
import time
from cpl.core.console import Console
from cpl.dependency.hosted.hosted_service import HostedService
class Hosted(HostedService):
def __init__(self):
self._stopped = False
async def start(self):
Console.write_line("Hosted Service Started")
while not self._stopped:
Console.write_line("Hosted Service Running")
await asyncio.sleep(5)
async def stop(self):
Console.write_line("Hosted Service Stopped")
self._stopped = True

View File

@@ -4,7 +4,8 @@ from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.core.pipes import IPAddressPipe
from cpl.dependency import ServiceCollection
from general.scoped_service import ScopedService
from example.custom.general.src.hosted_service import Hosted
from scoped_service import ScopedService
from test_service import TestService
@@ -23,3 +24,4 @@ class Startup(StartupABC):
services.add_transient(IPAddressPipe)
services.add_singleton(TestService)
services.add_scoped(ScopedService)
services.add_hosted_service(Hosted)

View File

@@ -84,7 +84,7 @@ class ApplicationABC(ABC):
Called by custom Application.main
"""
try:
Host.run(self.main)
Host.run_app(self.main)
except KeyboardInterrupt:
pass

View File

@@ -1,17 +1,83 @@
import asyncio
from typing import Callable
from cpl.dependency import get_provider
from cpl.dependency.hosted.startup_task import StartupTask
class Host:
_loop = asyncio.get_event_loop()
_loop: asyncio.AbstractEventLoop | None = None
_tasks: dict = {}
@classmethod
def get_loop(cls):
def get_loop(cls) -> asyncio.AbstractEventLoop:
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
return cls._loop
@classmethod
def run_start_tasks(cls):
provider = get_provider()
tasks = provider.get_services(StartupTask)
loop = cls.get_loop()
for task in tasks:
if asyncio.iscoroutinefunction(task.run):
loop.run_until_complete(task.run())
else:
task.run()
@classmethod
def run_hosted_services(cls):
provider = get_provider()
services = provider.get_hosted_services()
loop = cls.get_loop()
for service in services:
if asyncio.iscoroutinefunction(service.start):
cls._tasks[service] = loop.create_task(service.start())
@classmethod
async def _stop_all(cls):
for service in cls._tasks.keys():
if asyncio.iscoroutinefunction(service.stop):
await service.stop()
for task in cls._tasks.values():
task.cancel()
cls._tasks.clear()
@classmethod
def run_app(cls, func: Callable, *args, **kwargs):
loop = cls.get_loop()
cls.run_start_tasks()
cls.run_hosted_services()
async def runner():
try:
if asyncio.iscoroutinefunction(func):
app_task = asyncio.create_task(func(*args, **kwargs))
else:
loop = asyncio.get_running_loop()
app_task = loop.run_in_executor(None, func, *args, **kwargs)
await asyncio.wait(
[app_task, *cls._tasks.values()],
return_when=asyncio.FIRST_COMPLETED,
)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
await cls._stop_all()
loop.run_until_complete(runner())
@classmethod
def run(cls, func: Callable, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return cls._loop.run_until_complete(func(*args, **kwargs))
return func(*args, **kwargs)
return func(*args, **kwargs)

View File

@@ -9,7 +9,6 @@ from .table_manager import TableManager
def _with_migrations(self: _ApplicationABC, *paths: str | list[str]) -> _ApplicationABC:
from cpl.application.host import Host
from cpl.database.service.migration_service import MigrationService
migration_service = self._services.get_service(MigrationService)
@@ -21,8 +20,6 @@ def _with_migrations(self: _ApplicationABC, *paths: str | list[str]) -> _Applica
for path in paths:
migration_service.with_directory(path)
Host.run(migration_service.migrate)
return self

View File

@@ -11,6 +11,7 @@ from cpl.database.abc.db_context_abc import DBContextABC
from cpl.database.const import DATETIME_FORMAT
from cpl.database.external_data_temp_table_builder import ExternalDataTempTableBuilder
from cpl.database.logger import DBLogger
from cpl.database.model.server_type import ServerType, ServerTypes
from cpl.database.postgres.sql_select_builder import SQLSelectBuilder
from cpl.database.typing import T_DBM, Attribute, AttributeFilters, AttributeSorts
from cpl.dependency import get_provider
@@ -351,13 +352,13 @@ class DataAccessObjectABC(ABC, Generic[T_DBM]):
values = f"{await self._get_editor_id(obj) if not skip_editor else ''}{f', {values}' if not skip_editor and len(values) > 0 else f'{values}'}"
return f"""
INSERT INTO {self._table_name} (
{fields}
) VALUES (
{values}
)
RETURNING {self.__primary_key};
"""
INSERT INTO {self._table_name} (
{fields}
) VALUES (
{values}
)
{"RETURNING {self.__primary_key};" if ServerType.server_type == ServerTypes.POSTGRES else ";SELECT LAST_INSERT_ID();"}
"""
async def create(self, obj: T_DBM, skip_editor=False) -> int:
self._logger.debug(f"create {type(obj).__name__} {obj.__dict__}")

View File

@@ -6,7 +6,7 @@ from mysql.connector.aio import MySQLConnectionPool
from cpl.core.environment import Environment
from cpl.database.logger import DBLogger
from cpl.database.model import DatabaseSettings
from cpl.dependency import ServiceProvider
from cpl.dependency.context import get_provider
class MySQLPool:
@@ -18,7 +18,11 @@ class MySQLPool:
"user": database_settings.user,
"password": database_settings.password,
"database": database_settings.database,
"ssl_disabled": True,
"charset": database_settings.charset,
"use_unicode": database_settings.use_unicode,
"buffered": database_settings.buffered,
"auth_plugin": database_settings.auth_plugin,
"ssl_disabled": False,
}
self._pool: Optional[MySQLConnectionPool] = None

View File

@@ -7,14 +7,16 @@ from cpl.database.model import Migration
from cpl.database.model.server_type import ServerType, ServerTypes
from cpl.database.schema.executed_migration import ExecutedMigration
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
from cpl.dependency.hosted.startup_task import StartupTask
class MigrationService:
class MigrationService(StartupTask):
def __init__(self, logger: DBLogger, db: DBContextABC, executedMigrationDao: ExecutedMigrationDao):
def __init__(self, logger: DBLogger, db: DBContextABC, executed_migration_dao: ExecutedMigrationDao):
StartupTask.__init__(self)
self._logger = logger
self._db = db
self._executedMigrationDao = executedMigrationDao
self._executed_migration_dao = executed_migration_dao
self._script_directories: list[str] = []
@@ -23,12 +25,15 @@ class MigrationService:
elif ServerType.server_type == ServerTypes.MYSQL:
self.with_directory(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../scripts/mysql"))
async def run(self):
await self._execute(self._load_scripts())
def with_directory(self, directory: str) -> "MigrationService":
self._script_directories.append(directory)
return self
async def _get_migration_history(self) -> list[ExecutedMigration]:
results = await self._db.select(f"SELECT * FROM {self._executedMigrationDao.table_name}")
results = await self._db.select(f"SELECT * FROM {self._executed_migration_dao.table_name}")
applied_migrations = []
for result in results:
applied_migrations.append(ExecutedMigration(result[0]))
@@ -91,7 +96,7 @@ class MigrationService:
try:
# check if table exists
if len(result) > 0:
migration_from_db = await self._executedMigrationDao.find_by_id(migration.name)
migration_from_db = await self._executed_migration_dao.find_by_id(migration.name)
if migration_from_db is not None:
continue
@@ -99,12 +104,9 @@ class MigrationService:
await self._db.execute(migration.script, multi=True)
await self._executedMigrationDao.create(ExecutedMigration(migration.name), skip_editor=True)
await self._executed_migration_dao.create(ExecutedMigration(migration.name), skip_editor=True)
except Exception as e:
self._logger.fatal(
f"Migration failed: {migration.name}\n{active_statement}",
e,
)
async def migrate(self):
await self._execute(self._load_scripts())

View File

@@ -1,15 +1,16 @@
import contextvars
from contextlib import contextmanager
_current_provider = contextvars.ContextVar("current_provider", default=None)
_current_provider = contextvars.ContextVar("current_provider")
def use_root_provider(provider):
def use_root_provider(provider: "ServiceProvider"):
_current_provider.set(provider)
@contextmanager
def use_provider(provider):
def use_provider(provider: "ServiceProvider"):
token = _current_provider.set(provider)
try:
yield
@@ -17,5 +18,5 @@ def use_provider(provider):
_current_provider.reset(token)
def get_provider():
def get_provider() -> "ServiceProvider":
return _current_provider.get()

View File

@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
class HostedService(ABC):
@abstractmethod
async def start(self): ...
@abstractmethod
async def stop(self): ...

View File

@@ -0,0 +1,6 @@
from abc import ABC, abstractmethod
class StartupTask(ABC):
@abstractmethod
async def run(self): ...

View File

@@ -3,6 +3,7 @@ from typing import Union, Type, Callable, Self
from cpl.core.log.logger_abc import LoggerABC
from cpl.core.typing import T, Service
from cpl.core.utils.cache import Cache
from cpl.dependency.hosted.startup_task import StartupTask
from cpl.dependency.service_descriptor import ServiceDescriptor
from cpl.dependency.service_lifetime import ServiceLifetimeEnum
from cpl.dependency.service_provider import ServiceProvider
@@ -61,6 +62,14 @@ class ServiceCollection:
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.transient, service)
return self
def add_startup_task(self, task: Type[StartupTask]) -> Self:
self.add_singleton(StartupTask, task)
return self
def add_hosted_service(self, service_type: T, service: Service = None) -> Self:
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.hosted, service)
return self
def build(self) -> ServiceProvider:
sp = ServiceProvider(self._service_descriptors)
return sp

View File

@@ -5,3 +5,4 @@ class ServiceLifetimeEnum(Enum):
singleton = auto()
scoped = auto()
transient = auto()
hosted = auto()

View File

@@ -7,7 +7,7 @@ from typing import Optional, Type
from cpl.core.configuration import Configuration
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.environment import Environment
from cpl.core.typing import T, R, Source
from cpl.core.typing import T, Source
from cpl.dependency import use_provider
from cpl.dependency.service_descriptor import ServiceDescriptor
from cpl.dependency.service_lifetime import ServiceLifetimeEnum
@@ -54,9 +54,7 @@ class ServiceProvider:
return implementation
# raise Exception(f'Service {parameter.annotation} not found')
def _get_services(self, t: type, service_type: type = None, **kwargs) -> list[Optional[object]]:
def _get_services(self, t: type, *args, service_type: type = None, **kwargs) -> list[Optional[object]]:
implementations = []
for descriptor in self._service_descriptors:
if descriptor.service_type == t or issubclass(descriptor.service_type, t):
@@ -65,7 +63,7 @@ class ServiceProvider:
continue
implementation = self._build_service(
descriptor.service_type, origin_service_type=service_type, **kwargs
descriptor.service_type, *args, origin_service_type=service_type, **kwargs
)
if descriptor.lifetime in (ServiceLifetimeEnum.singleton, ServiceLifetimeEnum.scoped):
descriptor.implementation = implementation
@@ -74,7 +72,7 @@ class ServiceProvider:
return implementations
def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[R]:
def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[T]:
params = []
for param in sig.parameters.items():
parameter = param[1]
@@ -132,7 +130,11 @@ class ServiceProvider:
with use_provider(scoped_provider):
yield scoped_provider
def get_service(self, service_type: T, *args, **kwargs) -> Optional[R]:
def get_hosted_services(self) -> list[Optional[T]]:
hosted_services = [self.get_service(d.service_type) for d in self._service_descriptors if d.lifetime == ServiceLifetimeEnum.hosted]
return hosted_services
def get_service(self, service_type: Type[T], *args, **kwargs) -> Optional[T]:
result = self._find_service(service_type)
if result is None:
return None
@@ -155,11 +157,11 @@ class ServiceProvider:
return descriptor.service_type
return None
def get_services(self, service_type: T, *args, **kwargs) -> list[Optional[R]]:
def get_services(self, service_type: Type[T], *args, **kwargs) -> list[Optional[T]]:
implementations = []
if typing.get_origin(service_type) == list:
raise Exception(f"Invalid type {service_type}! Expected single type not list of type")
implementations.extend(self._get_services(service_type))
implementations.extend(self._get_services(service_type, None, *args, **kwargs))
return implementations
def get_service_types(self, service_type: Type[T]) -> list[Type[T]]: