import time from typing import Optional from cpl.application.abc import ApplicationABC from cpl.core.configuration import Configuration from cpl.core.console import Console from cpl.dependency import ServiceProvider from cpl.core.environment import Environment from cpl.core.log import LoggerABC from cpl.core.pipes import IPAddressPipe from cpl.mail import EMail, EMailClientABC from cpl.query import List from scoped_service import ScopedService from test_service import TestService from test_settings import TestSettings class Application(ApplicationABC): def __init__(self, services: ServiceProvider): ApplicationABC.__init__(self, services) self._logger = self._services.get_service(LoggerABC) self._mailer = self._services.get_service(EMailClientABC) def test_send_mail(self): mail = EMail() mail.add_header("Mime-Version: 1.0") mail.add_header("Content-Type: text/plain; charset=utf-8") mail.add_header("Content-Transfer-Encoding: quoted-printable") mail.add_receiver("sven.heidemann@sh-edraft.de") mail.subject = f"Test - {Environment.get_host_name()}" mail.body = "Dies ist ein Test :D" self._mailer.send_mail(mail) @staticmethod def _wait(time_ms: int): time.sleep(time_ms) def main(self): self._logger.debug(f"Host: {Environment.get_host_name()}") self._logger.debug(f"Environment: {Environment.get_environment()}") Console.write_line(List(range(0, 10)).select(lambda x: f"x={x}").to_list()) Console.spinner("Test", self._wait, 2, spinner_foreground_color="red") test: TestService = self._services.get_service(TestService) ip_pipe: IPAddressPipe = self._services.get_service(IPAddressPipe) test.run() test2: TestService = self._services.get_service(TestService) ip_pipe2: IPAddressPipe = self._services.get_service(IPAddressPipe) Console.write_line(f"DI working: {test == test2 and ip_pipe != ip_pipe2}") Console.write_line(self._services.get_service(LoggerABC)) root_scoped_service = self._services.get_service(ScopedService) with self._services.create_scope() as scope: s_srvc1 = scope.get_service(ScopedService) s_srvc2 = scope.get_service(ScopedService) Console.write_line(root_scoped_service) Console.write_line(s_srvc1) Console.write_line(s_srvc2) if root_scoped_service == s_srvc1 or s_srvc1 != s_srvc2: raise Exception("Root scoped service should not be equal to scoped service") root_scoped_service2 = self._services.get_service(ScopedService) Console.write_line(root_scoped_service2) if root_scoped_service == root_scoped_service2: raise Exception("Root scoped service should be equal to root scoped service 2") test_settings = Configuration.get(TestSettings) Console.write_line(test_settings.value) Console.write_line("reload config") Configuration.add_json_file(f"appsettings.json") Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json") Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True) test_settings1 = Configuration.get(TestSettings) Console.write_line(test_settings1.value) # self.test_send_mail()