Compare commits

..

2 Commits

Author SHA1 Message Date
685c20e3bf Added cron jobs as hosted services
All checks were successful
Test before pr merge / test-lint (pull_request) Successful in 6s
Build on push / prepare (push) Successful in 11s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 19s
Build on push / dependency (push) Successful in 14s
Build on push / database (push) Successful in 15s
Build on push / application (push) Successful in 18s
Build on push / mail (push) Successful in 19s
Build on push / translation (push) Successful in 29s
Build on push / auth (push) Successful in 18s
Build on push / api (push) Successful in 18s
2025-10-04 05:38:21 +02:00
e0f6e1c241 Console & file logging format msg seperate & removed timestamp from console & minor fixes to di
All checks were successful
Test before pr merge / test-lint (pull_request) Successful in 7s
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 18s
Build on push / query (push) Successful in 19s
Build on push / dependency (push) Successful in 14s
Build on push / translation (push) Successful in 16s
Build on push / mail (push) Successful in 17s
Build on push / application (push) Successful in 19s
Build on push / database (push) Successful in 19s
Build on push / auth (push) Successful in 25s
Build on push / api (push) Successful in 14s
2025-09-26 15:48:33 +02:00
21 changed files with 170 additions and 65 deletions

View File

@@ -1,3 +1,4 @@
import asyncio
import time
from cpl.application.abc import ApplicationABC
@@ -7,6 +8,7 @@ from cpl.core.environment import Environment
from cpl.core.log import LoggerABC
from cpl.core.pipes import IPAddressPipe
from cpl.dependency import ServiceProvider
from cpl.dependency.typing import Modules
from cpl.mail import EMail, EMailClientABC
from cpl.query import List
from scoped_service import ScopedService
@@ -16,8 +18,8 @@ from test_settings import TestSettings
class Application(ApplicationABC):
def __init__(self, services: ServiceProvider):
ApplicationABC.__init__(self, services)
def __init__(self, services: ServiceProvider, modules: Modules):
ApplicationABC.__init__(self, services, modules)
self._logger = self._services.get_service(LoggerABC)
self._mailer = self._services.get_service(EMailClientABC)
@@ -35,7 +37,7 @@ class Application(ApplicationABC):
def _wait(time_ms: int):
time.sleep(time_ms)
def main(self):
async def main(self):
self._logger.debug(f"Host: {Environment.get_host_name()}")
self._logger.debug(f"Environment: {Environment.get_environment()}")
Console.write_line(List(range(0, 10)).select(lambda x: f"x={x}").to_list())
@@ -75,7 +77,7 @@ class Application(ApplicationABC):
# self.test_send_mail()
x = 0
while x < 5:
while x < 500:
Console.write_line("Running...")
x += 1
time.sleep(5)
await asyncio.sleep(5)

View File

@@ -1,7 +1,9 @@
import asyncio
import time
from datetime import datetime
from cpl.core.console import Console
from cpl.core.time.cron import Cron
from cpl.dependency.hosted.cronjob import CronjobABC
from cpl.dependency.hosted.hosted_service import HostedService
@@ -17,4 +19,12 @@ class Hosted(HostedService):
async def stop(self):
Console.write_line("Hosted Service Stopped")
self._stopped = True
self._stopped = True
class MyCronJob(CronjobABC):
def __init__(self):
CronjobABC.__init__(self, Cron("*/1 * * * *")) # Every minute
async def loop(self):
Console.write_line(f"[{datetime.now()}] Hello from Cronjob!")

View File

@@ -1,11 +1,10 @@
from cpl import mail
from cpl.application.abc import StartupABC
from cpl.core.configuration import Configuration
from cpl.core.environment import Environment
from cpl.core.pipes import IPAddressPipe
from cpl.dependency import ServiceCollection
from cpl.mail.mail_module import MailModule
from hosted_service import Hosted
from hosted_service import Hosted, MyCronJob
from scoped_service import ScopedService
from test_service import TestService
@@ -26,3 +25,4 @@ class Startup(StartupABC):
services.add_singleton(TestService)
services.add_scoped(ScopedService)
services.add_hosted_service(Hosted)
services.add_hosted_service(MyCronJob)

View File

@@ -114,6 +114,9 @@ class ApplicationABC(ABC):
Host.run_app(self.main)
except KeyboardInterrupt:
pass
finally:
logger = self._services.get_service(LoggerABC)
logger.info("Application shutdown")
@abstractmethod
def main(self): ...

View File

@@ -6,7 +6,6 @@ from cpl.application.abc.application_extension_abc import ApplicationExtensionAB
from cpl.application.abc.startup_abc import StartupABC
from cpl.application.abc.startup_extension_abc import StartupExtensionABC
from cpl.application.host import Host
from cpl.core.errors import dependency_error
from cpl.dependency.context import get_provider, use_root_provider
from cpl.dependency.service_collection import ServiceCollection

View File

@@ -57,14 +57,9 @@ class Host:
async def runner():
try:
if asyncio.iscoroutinefunction(func):
app_task = asyncio.create_task(func(*args, **kwargs))
await func(*args, **kwargs)
else:
app_task = cls.get_loop().run_in_executor(None, func, *args, **kwargs)
await asyncio.wait(
[app_task, *cls._tasks.values()],
return_when=asyncio.FIRST_COMPLETED,
)
func(*args, **kwargs)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:

View File

@@ -93,14 +93,13 @@ class Logger(LoggerABC):
def _log(self, level: LogLevel, *messages: Messages):
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
formatted_message = self._format_message(level.value, timestamp, *messages)
self._write_log_to_file(level, formatted_message)
self._write_to_console(level, formatted_message)
self._write_log_to_file(level, self._file_format_message(level.value, timestamp, *messages))
self._write_to_console(level, self._console_format_message(level.value, timestamp, *messages))
except Exception as e:
print(f"Error while logging: {e} -> {traceback.format_exc()}")
def _format_message(self, level: str, timestamp, *messages: Messages) -> str:
def _file_format_message(self, level: str, timestamp, *messages: Messages) -> str:
if isinstance(messages, tuple):
messages = list(messages)
@@ -119,6 +118,24 @@ class Logger(LoggerABC):
return message
def _console_format_message(self, level: str, timestamp, *messages: Messages) -> str:
if isinstance(messages, tuple):
messages = list(messages)
if not isinstance(messages, list):
messages = [messages]
messages = [str(message) for message in messages if message is not None]
message = f"[{level.upper():^3}]"
message += f" [{self._file_prefix}]"
if self._source is not None:
message += f" - [{self._source}]"
message += f": {' '.join(messages)}"
return message
def header(self, string: str):
self._log(LogLevel.info, string)

View File

@@ -11,7 +11,10 @@ class LoggerABC(ABC):
def set_level(self, level: LogLevel): ...
@abstractmethod
def _format_message(self, level: str, timestamp, *messages: Messages) -> str: ...
def _file_format_message(self, level: str, timestamp, *messages: Messages) -> str: ...
@abstractmethod
def _console_format_message(self, level: str, timestamp, *messages: Messages) -> str: ...
@abstractmethod
def header(self, string: str):

View File

@@ -1,15 +1,13 @@
import asyncio
import importlib.util
import json
import traceback
from datetime import datetime
from starlette.requests import Request
from cpl.core.log.log_level import LogLevel
from cpl.core.log.logger import Logger
from cpl.core.typing import Source, Messages
from cpl.dependency import get_provider
from cpl.dependency.context import get_provider
class StructuredLogger(Logger):
@@ -21,18 +19,7 @@ class StructuredLogger(Logger):
def log_file(self):
return f"logs/{self._file_prefix}_{datetime.now().strftime('%Y-%m-%d')}.jsonl"
def _log(self, level: LogLevel, *messages: Messages):
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
formatted_message = self._format_message(level.value, timestamp, *messages)
structured_message = self._get_structured_message(level.value, timestamp, formatted_message)
self._write_log_to_file(level, structured_message)
self._write_to_console(level, formatted_message)
except Exception as e:
print(f"Error while logging: {e} -> {traceback.format_exc()}")
def _get_structured_message(self, level: str, timestamp: str, messages: str) -> str:
def _file_format_message(self, level: str, timestamp: str, *messages: Messages) -> str:
structured_message = {
"timestamp": timestamp,
"level": level.upper(),

View File

@@ -1,7 +1,7 @@
import inspect
from typing import Type
from cpl.core.log import LoggerABC, LogLevel
from cpl.core.log import LoggerABC, LogLevel, StructuredLogger
from cpl.core.typing import Messages
from cpl.dependency.inject import inject
from cpl.dependency.service_provider import ServiceProvider
@@ -31,8 +31,11 @@ class WrappedLogger(LoggerABC):
def set_level(self, level: LogLevel):
self._logger.set_level(level)
def _format_message(self, level: str, timestamp, *messages: Messages) -> str:
return self._logger._format_message(level, timestamp, *messages)
def _file_format_message(self, level: str, timestamp, *messages: Messages) -> str:
return self._logger._file_format_message(level, timestamp, *messages)
def _console_format_message(self, level: str, timestamp, *messages: Messages) -> str:
return self._logger._console_format_message(level, timestamp, *messages)
@staticmethod
def _get_source() -> str | None:
@@ -48,6 +51,7 @@ class WrappedLogger(LoggerABC):
ServiceCollection,
WrappedLogger,
WrappedLogger.__subclasses__(),
StructuredLogger,
]
ignore_modules = [x.__module__ for x in ignore_classes if isinstance(x, type)]

View File

@@ -1,2 +1,2 @@
from .time_format_settings import TimeFormatSettings
from .time_format_settings_names_enum import TimeFormatSettingsNamesEnum
from .cron import Cron

View File

@@ -0,0 +1,13 @@
from datetime import datetime
import croniter
class Cron:
def __init__(self, cron_expression: str, start_time: datetime = None):
self._cron_expression = cron_expression
self._start_time = start_time or datetime.now()
self._iter = croniter.croniter(cron_expression, self._start_time)
def next(self) -> datetime:
return self._iter.get_next(datetime)

View File

@@ -13,7 +13,7 @@ class TimeFormatSettings(ConfigurationModelABC):
date_time_format: str = None,
date_time_log_format: str = None,
):
ConfigurationModelABC.__init__(self)
ConfigurationModelABC.__init__(self, readonly=False)
self._date_format: Optional[str] = date_format
self._time_format: Optional[str] = time_format
self._date_time_format: Optional[str] = date_time_format

View File

@@ -1,8 +0,0 @@
from enum import Enum
class TimeFormatSettingsNamesEnum(Enum):
date_format = "DateFormat"
time_format = "TimeFormat"
date_time_format = "DateTimeFormat"
date_time_log_format = "DateTimeLogFormat"

View File

@@ -3,3 +3,4 @@ colorama==0.4.6
tabulate==0.9.0
termcolor==3.1.0
pynput==1.8.1
croniter==6.0.0

View File

@@ -21,4 +21,4 @@ class DatabaseSettings(ConfigurationModelABC):
self.option("use_unicode", bool, False)
self.option("buffered", bool, False)
self.option("auth_plugin", str, "caching_sha2_password")
self.option("ssl_disabled", bool, False)
self.option("ssl_disabled", bool, True)

View File

@@ -22,27 +22,27 @@ class MySQLPool:
"use_unicode": database_settings.use_unicode,
"buffered": database_settings.buffered,
"auth_plugin": database_settings.auth_plugin,
"ssl_disabled": False,
"ssl_disabled": database_settings.ssl_disabled,
}
self._pool: Optional[MySQLConnectionPool] = None
async def _get_pool(self):
if self._pool is None:
self._pool = MySQLConnectionPool(
pool_name="mypool", pool_size=Environment.get("DB_POOL_SIZE", int, 1), **self._dbconfig
)
await self._pool.initialize_pool()
con = await self._pool.get_connection()
try:
self._pool = MySQLConnectionPool(
pool_name="mypool", pool_size=Environment.get("DB_POOL_SIZE", int, 1), **self._dbconfig
)
await self._pool.initialize_pool()
con = await self._pool.get_connection()
async with await con.cursor() as cursor:
await cursor.execute("SELECT 1")
await cursor.fetchall()
await con.close()
except Exception as e:
logger = get_provider().get_service(DBLogger)
logger.fatal(f"Error connecting to the database: {e}")
finally:
await con.close()
logger.fatal(f"Error connecting to the database", e)
return self._pool

View File

@@ -7,7 +7,7 @@ from psycopg_pool import AsyncConnectionPool, PoolTimeout
from cpl.core.environment import Environment
from cpl.database.logger import DBLogger
from cpl.database.model import DatabaseSettings
from cpl.dependency import ServiceProvider
from cpl.dependency.context import get_provider
class PostgresPool:
@@ -31,15 +31,16 @@ class PostgresPool:
pool = AsyncConnectionPool(
conninfo=self._conninfo, open=False, min_size=1, max_size=Environment.get("DB_POOL_SIZE", int, 1)
)
await pool.open()
try:
await pool.open()
async with pool.connection() as con:
await pool.check_connection(con)
self._pool = pool
except PoolTimeout as e:
await pool.close()
logger = get_provider().get_service(DBLogger)
logger.fatal(f"Failed to connect to the database", e)
self._pool = pool
return self._pool

View File

@@ -1,2 +1,3 @@
from .hosted_service import HostedService
from .startup_task import StartupTask
from .cronjob import CronjobABC

View File

@@ -0,0 +1,40 @@
import asyncio
from abc import ABC, abstractmethod
from datetime import datetime
from cpl.core.time.cron import Cron
from cpl.dependency.hosted import HostedService
class CronjobABC(HostedService, ABC):
def __init__(self, cron: Cron):
self._cron = cron
self._task: asyncio.Task | None = None
self._running = False
async def start(self):
self._running = True
self._task = asyncio.create_task(self._run_loop())
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run_loop(self):
while self._running:
next_run = self._cron.next()
now = datetime.now()
delay = (next_run - now).total_seconds()
if delay > 0:
await asyncio.sleep(delay)
if not self._running:
break
await self.loop()
@abstractmethod
async def loop(self): ...

View File

@@ -1,4 +1,5 @@
import copy
import inspect
import typing
from contextlib import contextmanager
from inspect import signature, Parameter, Signature
@@ -77,6 +78,35 @@ class ServiceProvider:
return implementations
def _get_source(self):
stack = inspect.stack()
if len(stack) <= 1:
return None
from cpl.dependency.service_collection import ServiceCollection
ignore_classes = [
ServiceProvider,
ServiceProvider.__subclasses__(),
ServiceCollection,
]
ignore_modules = [x.__module__ for x in ignore_classes if isinstance(x, type)]
for i, frame_info in enumerate(stack[1:]):
module = inspect.getmodule(frame_info.frame)
if module is None:
continue
if module.__name__ in ignore_classes or module in ignore_classes:
continue
if module in ignore_modules or module.__name__ in ignore_modules:
continue
if module.__name__ != __name__:
return module.__name__
def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[T]:
params = []
for param in sig.parameters.items():
@@ -88,7 +118,11 @@ class ServiceProvider:
)
elif parameter.annotation == Source:
params.append(origin_service_type.__name__)
params.append(
origin_service_type.__name__
if inspect.isclass(origin_service_type)
else str(origin_service_type)
)
elif issubclass(parameter.annotation, ServiceProvider):
params.append(self)
@@ -116,6 +150,9 @@ class ServiceProvider:
else:
service_type = descriptor.service_type
if origin_service_type is None:
origin_service_type = self._get_source()
if origin_service_type is None:
origin_service_type = service_type