Added hosted services #186
Some checks failed
Test before pr merge / test-lint (pull_request) Failing after 6s
Build on push / prepare (push) Successful in 8s
Build on push / query (push) Successful in 18s
Build on push / core (push) Successful in 18s
Build on push / dependency (push) Successful in 17s
Build on push / application (push) Successful in 15s
Build on push / translation (push) Successful in 17s
Build on push / database (push) Successful in 18s
Build on push / mail (push) Successful in 24s
Build on push / auth (push) Successful in 14s
Build on push / api (push) Successful in 14s

This commit is contained in:
2025-09-25 00:53:17 +02:00
parent 75417966eb
commit 0ca5e5757a
11 changed files with 115 additions and 18 deletions

View File

@@ -1 +1,2 @@
from .application_builder import ApplicationBuilder
from .host import Host

View File

@@ -84,7 +84,7 @@ class ApplicationABC(ABC):
Called by custom Application.main
"""
try:
Host.run(self.main)
Host.run_app(self.main)
except KeyboardInterrupt:
pass

View File

@@ -6,27 +6,78 @@ from cpl.dependency.hosted.startup_task import StartupTask
class Host:
_loop = asyncio.get_event_loop()
_loop: asyncio.AbstractEventLoop | None = None
_tasks: dict = {}
@classmethod
def get_loop(cls):
def get_loop(cls) -> asyncio.AbstractEventLoop:
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
return cls._loop
@classmethod
def run_start_tasks(cls):
provider = get_provider()
tasks = provider.get_services(StartupTask)
loop = cls.get_loop()
for task in tasks:
if asyncio.iscoroutinefunction(task.run):
cls._loop.run_until_complete(task.run())
loop.run_until_complete(task.run())
else:
task.run()
@classmethod
def run(cls, func: Callable, *args, **kwargs):
cls.run_start_tasks()
def run_hosted_services(cls):
provider = get_provider()
services = provider.get_hosted_services()
loop = cls.get_loop()
for service in services:
if asyncio.iscoroutinefunction(service.start):
cls._tasks[service] = loop.create_task(service.start())
@classmethod
async def _stop_all(cls):
for service in cls._tasks.keys():
if asyncio.iscoroutinefunction(service.stop):
await service.stop()
for task in cls._tasks.values():
task.cancel()
cls._tasks.clear()
@classmethod
def run_app(cls, func: Callable, *args, **kwargs):
loop = cls.get_loop()
cls.run_start_tasks()
cls.run_hosted_services()
async def runner():
try:
if asyncio.iscoroutinefunction(func):
app_task = asyncio.create_task(func(*args, **kwargs))
else:
loop = asyncio.get_running_loop()
app_task = loop.run_in_executor(None, func, *args, **kwargs)
await asyncio.wait(
[app_task, *cls._tasks.values()],
return_when=asyncio.FIRST_COMPLETED,
)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
await cls._stop_all()
loop.run_until_complete(runner())
@classmethod
def run(cls, func: Callable, *args, **kwargs):
if asyncio.iscoroutinefunction(func):
return cls._loop.run_until_complete(func(*args, **kwargs))
return func(*args, **kwargs)
return func(*args, **kwargs)

View File

@@ -0,0 +1,2 @@
from .hosted_service import HostedService
from .startup_task import StartupTask

View File

@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
class HostedService(ABC):
@abstractmethod
async def start(self): ...
@abstractmethod
async def stop(self): ...

View File

@@ -66,6 +66,10 @@ class ServiceCollection:
self.add_singleton(StartupTask, task)
return self
def add_hosted_service(self, service_type: T, service: Service = None) -> Self:
self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.hosted, service)
return self
def build(self) -> ServiceProvider:
sp = ServiceProvider(self._service_descriptors)
return sp

View File

@@ -5,3 +5,4 @@ class ServiceLifetimeEnum(Enum):
singleton = auto()
scoped = auto()
transient = auto()
hosted = auto()

View File

@@ -7,7 +7,7 @@ from typing import Optional, Type
from cpl.core.configuration import Configuration
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.core.environment import Environment
from cpl.core.typing import T, R, Source
from cpl.core.typing import T, Source
from cpl.dependency import use_provider
from cpl.dependency.service_descriptor import ServiceDescriptor
from cpl.dependency.service_lifetime import ServiceLifetimeEnum
@@ -54,9 +54,7 @@ class ServiceProvider:
return implementation
# raise Exception(f'Service {parameter.annotation} not found')
def _get_services(self, t: type, service_type: type = None, **kwargs) -> list[Optional[object]]:
def _get_services(self, t: type, *args, service_type: type = None, **kwargs) -> list[Optional[object]]:
implementations = []
for descriptor in self._service_descriptors:
if descriptor.service_type == t or issubclass(descriptor.service_type, t):
@@ -65,7 +63,7 @@ class ServiceProvider:
continue
implementation = self._build_service(
descriptor.service_type, origin_service_type=service_type, **kwargs
descriptor.service_type, *args, origin_service_type=service_type, **kwargs
)
if descriptor.lifetime in (ServiceLifetimeEnum.singleton, ServiceLifetimeEnum.scoped):
descriptor.implementation = implementation
@@ -74,7 +72,7 @@ class ServiceProvider:
return implementations
def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[R]:
def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[T]:
params = []
for param in sig.parameters.items():
parameter = param[1]
@@ -132,7 +130,11 @@ class ServiceProvider:
with use_provider(scoped_provider):
yield scoped_provider
def get_service(self, service_type: T, *args, **kwargs) -> Optional[R]:
def get_hosted_services(self) -> list[Optional[T]]:
hosted_services = [self.get_service(d.service_type) for d in self._service_descriptors if d.lifetime == ServiceLifetimeEnum.hosted]
return hosted_services
def get_service(self, service_type: Type[T], *args, **kwargs) -> Optional[T]:
result = self._find_service(service_type)
if result is None:
return None
@@ -155,11 +157,11 @@ class ServiceProvider:
return descriptor.service_type
return None
def get_services(self, service_type: T, *args, **kwargs) -> list[Optional[R]]:
def get_services(self, service_type: Type[T], *args, **kwargs) -> list[Optional[T]]:
implementations = []
if typing.get_origin(service_type) == list:
raise Exception(f"Invalid type {service_type}! Expected single type not list of type")
implementations.extend(self._get_services(service_type))
implementations.extend(self._get_services(service_type, None, *args, **kwargs))
return implementations
def get_service_types(self, service_type: Type[T]) -> list[Type[T]]: