diff --git a/example/custom/general/src/application.py b/example/custom/general/src/application.py index d88454ba..e901097f 100644 --- a/example/custom/general/src/application.py +++ b/example/custom/general/src/application.py @@ -1,13 +1,12 @@ import time -from typing import Optional from cpl.application.abc import ApplicationABC from cpl.core.configuration import Configuration from cpl.core.console import Console -from cpl.dependency import ServiceProvider from cpl.core.environment import Environment from cpl.core.log import LoggerABC from cpl.core.pipes import IPAddressPipe +from cpl.dependency import ServiceProvider from cpl.mail import EMail, EMailClientABC from cpl.query import List from scoped_service import ScopedService @@ -74,3 +73,9 @@ class Application(ApplicationABC): test_settings1 = Configuration.get(TestSettings) Console.write_line(test_settings1.value) # self.test_send_mail() + + x = 0 + while x < 5: + Console.write_line("Running...") + x += 1 + time.sleep(5) diff --git a/example/custom/general/src/hosted_service.py b/example/custom/general/src/hosted_service.py new file mode 100644 index 00000000..9ec96afc --- /dev/null +++ b/example/custom/general/src/hosted_service.py @@ -0,0 +1,20 @@ +import asyncio +import time + +from cpl.core.console import Console +from cpl.dependency.hosted.hosted_service import HostedService + + +class Hosted(HostedService): + def __init__(self): + self._stopped = False + + async def start(self): + Console.write_line("Hosted Service Started") + while not self._stopped: + Console.write_line("Hosted Service Running") + await asyncio.sleep(5) + + async def stop(self): + Console.write_line("Hosted Service Stopped") + self._stopped = True \ No newline at end of file diff --git a/example/custom/general/src/startup.py b/example/custom/general/src/startup.py index a195c8ed..18ddfba6 100644 --- a/example/custom/general/src/startup.py +++ b/example/custom/general/src/startup.py @@ -4,6 +4,7 @@ from cpl.core.configuration import Configuration from cpl.core.environment import Environment from cpl.core.pipes import IPAddressPipe from cpl.dependency import ServiceCollection +from example.custom.general.src.hosted_service import Hosted from scoped_service import ScopedService from test_service import TestService @@ -23,3 +24,4 @@ class Startup(StartupABC): services.add_transient(IPAddressPipe) services.add_singleton(TestService) services.add_scoped(ScopedService) + services.add_hosted_service(Hosted) diff --git a/src/cpl-application/cpl/application/__init__.py b/src/cpl-application/cpl/application/__init__.py index 34366e07..99ff5f7d 100644 --- a/src/cpl-application/cpl/application/__init__.py +++ b/src/cpl-application/cpl/application/__init__.py @@ -1 +1,2 @@ from .application_builder import ApplicationBuilder +from .host import Host \ No newline at end of file diff --git a/src/cpl-application/cpl/application/abc/application_abc.py b/src/cpl-application/cpl/application/abc/application_abc.py index f4838d06..1395402a 100644 --- a/src/cpl-application/cpl/application/abc/application_abc.py +++ b/src/cpl-application/cpl/application/abc/application_abc.py @@ -84,7 +84,7 @@ class ApplicationABC(ABC): Called by custom Application.main """ try: - Host.run(self.main) + Host.run_app(self.main) except KeyboardInterrupt: pass diff --git a/src/cpl-application/cpl/application/host.py b/src/cpl-application/cpl/application/host.py index b540f33f..5571d125 100644 --- a/src/cpl-application/cpl/application/host.py +++ b/src/cpl-application/cpl/application/host.py @@ -6,27 +6,78 @@ from cpl.dependency.hosted.startup_task import StartupTask class Host: - _loop = asyncio.get_event_loop() + _loop: asyncio.AbstractEventLoop | None = None + _tasks: dict = {} @classmethod - def get_loop(cls): + def get_loop(cls) -> asyncio.AbstractEventLoop: + if cls._loop is None: + cls._loop = asyncio.new_event_loop() + asyncio.set_event_loop(cls._loop) return cls._loop @classmethod def run_start_tasks(cls): provider = get_provider() tasks = provider.get_services(StartupTask) + loop = cls.get_loop() + for task in tasks: if asyncio.iscoroutinefunction(task.run): - cls._loop.run_until_complete(task.run()) + loop.run_until_complete(task.run()) else: task.run() @classmethod - def run(cls, func: Callable, *args, **kwargs): - cls.run_start_tasks() + def run_hosted_services(cls): + provider = get_provider() + services = provider.get_hosted_services() + loop = cls.get_loop() + for service in services: + if asyncio.iscoroutinefunction(service.start): + cls._tasks[service] = loop.create_task(service.start()) + + @classmethod + async def _stop_all(cls): + for service in cls._tasks.keys(): + if asyncio.iscoroutinefunction(service.stop): + await service.stop() + + for task in cls._tasks.values(): + task.cancel() + + cls._tasks.clear() + + @classmethod + def run_app(cls, func: Callable, *args, **kwargs): + loop = cls.get_loop() + + cls.run_start_tasks() + cls.run_hosted_services() + + async def runner(): + try: + if asyncio.iscoroutinefunction(func): + app_task = asyncio.create_task(func(*args, **kwargs)) + else: + loop = asyncio.get_running_loop() + app_task = loop.run_in_executor(None, func, *args, **kwargs) + + await asyncio.wait( + [app_task, *cls._tasks.values()], + return_when=asyncio.FIRST_COMPLETED, + ) + except (KeyboardInterrupt, asyncio.CancelledError): + pass + finally: + await cls._stop_all() + + loop.run_until_complete(runner()) + + @classmethod + def run(cls, func: Callable, *args, **kwargs): if asyncio.iscoroutinefunction(func): return cls._loop.run_until_complete(func(*args, **kwargs)) - return func(*args, **kwargs) + return func(*args, **kwargs) \ No newline at end of file diff --git a/src/cpl-dependency/cpl/dependency/hosted/__init__.py b/src/cpl-dependency/cpl/dependency/hosted/__init__.py index e69de29b..ab2f568e 100644 --- a/src/cpl-dependency/cpl/dependency/hosted/__init__.py +++ b/src/cpl-dependency/cpl/dependency/hosted/__init__.py @@ -0,0 +1,2 @@ +from .hosted_service import HostedService +from .startup_task import StartupTask \ No newline at end of file diff --git a/src/cpl-dependency/cpl/dependency/hosted/hosted_service.py b/src/cpl-dependency/cpl/dependency/hosted/hosted_service.py new file mode 100644 index 00000000..9d6d3bab --- /dev/null +++ b/src/cpl-dependency/cpl/dependency/hosted/hosted_service.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod + + +class HostedService(ABC): + @abstractmethod + async def start(self): ... + + @abstractmethod + async def stop(self): ... \ No newline at end of file diff --git a/src/cpl-dependency/cpl/dependency/service_collection.py b/src/cpl-dependency/cpl/dependency/service_collection.py index a63b263a..0ab9e17b 100644 --- a/src/cpl-dependency/cpl/dependency/service_collection.py +++ b/src/cpl-dependency/cpl/dependency/service_collection.py @@ -66,6 +66,10 @@ class ServiceCollection: self.add_singleton(StartupTask, task) return self + def add_hosted_service(self, service_type: T, service: Service = None) -> Self: + self._add_descriptor_by_lifetime(service_type, ServiceLifetimeEnum.hosted, service) + return self + def build(self) -> ServiceProvider: sp = ServiceProvider(self._service_descriptors) return sp diff --git a/src/cpl-dependency/cpl/dependency/service_lifetime.py b/src/cpl-dependency/cpl/dependency/service_lifetime.py index 28c3aa5d..ed23a2a9 100644 --- a/src/cpl-dependency/cpl/dependency/service_lifetime.py +++ b/src/cpl-dependency/cpl/dependency/service_lifetime.py @@ -5,3 +5,4 @@ class ServiceLifetimeEnum(Enum): singleton = auto() scoped = auto() transient = auto() + hosted = auto() diff --git a/src/cpl-dependency/cpl/dependency/service_provider.py b/src/cpl-dependency/cpl/dependency/service_provider.py index 180ad4cc..1ba688d2 100644 --- a/src/cpl-dependency/cpl/dependency/service_provider.py +++ b/src/cpl-dependency/cpl/dependency/service_provider.py @@ -7,7 +7,7 @@ from typing import Optional, Type from cpl.core.configuration import Configuration from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC from cpl.core.environment import Environment -from cpl.core.typing import T, R, Source +from cpl.core.typing import T, Source from cpl.dependency import use_provider from cpl.dependency.service_descriptor import ServiceDescriptor from cpl.dependency.service_lifetime import ServiceLifetimeEnum @@ -54,9 +54,7 @@ class ServiceProvider: return implementation - # raise Exception(f'Service {parameter.annotation} not found') - - def _get_services(self, t: type, service_type: type = None, **kwargs) -> list[Optional[object]]: + def _get_services(self, t: type, *args, service_type: type = None, **kwargs) -> list[Optional[object]]: implementations = [] for descriptor in self._service_descriptors: if descriptor.service_type == t or issubclass(descriptor.service_type, t): @@ -65,7 +63,7 @@ class ServiceProvider: continue implementation = self._build_service( - descriptor.service_type, origin_service_type=service_type, **kwargs + descriptor.service_type, *args, origin_service_type=service_type, **kwargs ) if descriptor.lifetime in (ServiceLifetimeEnum.singleton, ServiceLifetimeEnum.scoped): descriptor.implementation = implementation @@ -74,7 +72,7 @@ class ServiceProvider: return implementations - def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[R]: + def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[T]: params = [] for param in sig.parameters.items(): parameter = param[1] @@ -132,7 +130,11 @@ class ServiceProvider: with use_provider(scoped_provider): yield scoped_provider - def get_service(self, service_type: T, *args, **kwargs) -> Optional[R]: + def get_hosted_services(self) -> list[Optional[T]]: + hosted_services = [self.get_service(d.service_type) for d in self._service_descriptors if d.lifetime == ServiceLifetimeEnum.hosted] + return hosted_services + + def get_service(self, service_type: Type[T], *args, **kwargs) -> Optional[T]: result = self._find_service(service_type) if result is None: return None @@ -155,11 +157,11 @@ class ServiceProvider: return descriptor.service_type return None - def get_services(self, service_type: T, *args, **kwargs) -> list[Optional[R]]: + def get_services(self, service_type: Type[T], *args, **kwargs) -> list[Optional[T]]: implementations = [] if typing.get_origin(service_type) == list: raise Exception(f"Invalid type {service_type}! Expected single type not list of type") - implementations.extend(self._get_services(service_type)) + implementations.extend(self._get_services(service_type, None, *args, **kwargs)) return implementations def get_service_types(self, service_type: Type[T]) -> list[Type[T]]: