Improved wrapped logging #187
All checks were successful
Test before pr merge / test-lint (pull_request) Successful in 6s
All checks were successful
Test before pr merge / test-lint (pull_request) Successful in 6s
This commit is contained in:
@@ -6,13 +6,13 @@ from cpl.auth.schema._administration.auth_user import AuthUser
|
||||
_user_context: ContextVar[Optional[AuthUser]] = ContextVar("user", default=None)
|
||||
|
||||
|
||||
def set_user(user_id: Optional[AuthUser]):
|
||||
def set_user(user: Optional[AuthUser]):
|
||||
from cpl.dependency.service_provider_abc import ServiceProviderABC
|
||||
from cpl.core.log.logger_abc import LoggerABC
|
||||
|
||||
logger = ServiceProviderABC.get_global_service(LoggerABC)
|
||||
logger.trace("Setting user context", user_id)
|
||||
_user_context.set(user_id)
|
||||
logger.trace("Setting user context", user.id)
|
||||
_user_context.set(user)
|
||||
|
||||
|
||||
def get_user() -> Optional[AuthUser]:
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import json
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
|
||||
from cpl.core.log import Logger, LogLevel
|
||||
from starlette.requests import Request
|
||||
|
||||
from cpl.core.log.log_level import LogLevel
|
||||
from cpl.core.log.logger import Logger
|
||||
from cpl.core.typing import Source, Messages
|
||||
|
||||
|
||||
@@ -20,8 +24,9 @@ class StructuredLogger(Logger):
|
||||
try:
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||
formatted_message = self._format_message(level.value, timestamp, *messages)
|
||||
structured_message = self._get_structured_message(level.value, timestamp, formatted_message)
|
||||
|
||||
self._write_log_to_file(level, formatted_message)
|
||||
self._write_log_to_file(level, structured_message)
|
||||
self._write_to_console(level, formatted_message)
|
||||
except Exception as e:
|
||||
print(f"Error while logging: {e} -> {traceback.format_exc()}")
|
||||
@@ -37,10 +42,31 @@ class StructuredLogger(Logger):
|
||||
self._enrich_message_with_request(structured_message)
|
||||
self._enrich_message_with_user(structured_message)
|
||||
|
||||
return str(structured_message)
|
||||
return json.dumps(structured_message, ensure_ascii=False)
|
||||
|
||||
@staticmethod
|
||||
def _enrich_message_with_request(message: dict):
|
||||
def _scope_to_json(request: Request, include_headers: bool = False) -> dict:
|
||||
scope = dict(request.scope)
|
||||
|
||||
def convert(value):
|
||||
if isinstance(value, bytes):
|
||||
return value.decode("utf-8")
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [convert(v) for v in value]
|
||||
if isinstance(value, dict):
|
||||
return {str(k): convert(v) for k, v in value.items()}
|
||||
if not isinstance(value, (str, int, float, bool, type(None))):
|
||||
return str(value)
|
||||
return value
|
||||
|
||||
serializable_scope = {str(k): convert(v) for k, v in scope.items()}
|
||||
|
||||
if not include_headers and "headers" in serializable_scope:
|
||||
serializable_scope["headers"] = "<omitted>"
|
||||
|
||||
return serializable_scope
|
||||
|
||||
def _enrich_message_with_request(self, message: dict):
|
||||
if importlib.util.find_spec("cpl.api") is None:
|
||||
return
|
||||
|
||||
@@ -55,7 +81,7 @@ class StructuredLogger(Logger):
|
||||
message["request"] = {
|
||||
"url": str(request.url),
|
||||
"method": request.method,
|
||||
"scope": request.scope,
|
||||
"scope": self._scope_to_json(request),
|
||||
}
|
||||
if isinstance(request, Request) and request.scope == "http":
|
||||
request: Request = request # fix typing for IDEs
|
||||
@@ -73,8 +99,13 @@ class StructuredLogger(Logger):
|
||||
if user is None:
|
||||
return
|
||||
|
||||
from cpl.dependency.service_provider_abc import ServiceProviderABC
|
||||
from cpl.auth.keycloak.keycloak_admin import KeycloakAdmin
|
||||
|
||||
keycloak = ServiceProviderABC.get_global_service(KeycloakAdmin)
|
||||
kc_user = keycloak.get_user(user.keycloak_id)
|
||||
message["user"] = {
|
||||
"id": str(user.id),
|
||||
"username": user.username,
|
||||
"email": user.email,
|
||||
"username": kc_user.get("username"),
|
||||
"email": kc_user.get("email"),
|
||||
}
|
||||
|
||||
@@ -1,13 +1,28 @@
|
||||
import inspect
|
||||
|
||||
from cpl.core.log import LoggerABC, LogLevel
|
||||
from cpl.core.typing import Messages
|
||||
from cpl.core.typing import Messages, Source
|
||||
from cpl.dependency.service_provider_abc import ServiceProviderABC
|
||||
|
||||
|
||||
class WrappedLogger(LoggerABC):
|
||||
|
||||
def __init__(self, logger: LoggerABC):
|
||||
def __init__(self, file_prefix: str):
|
||||
LoggerABC.__init__(self)
|
||||
assert isinstance(logger, LoggerABC), "logger must be an instance of LoggerABC"
|
||||
self._logger = logger
|
||||
assert file_prefix is not None and file_prefix != "", "file_prefix must be a non-empty string"
|
||||
|
||||
t_logger = ServiceProviderABC.get_global_service(LoggerABC)
|
||||
self._t_logger = type(t_logger) if t_logger is not None else None
|
||||
self._source = None
|
||||
self._file_prefix = file_prefix
|
||||
|
||||
self._set_logger()
|
||||
|
||||
def _set_logger(self):
|
||||
if self._t_logger is None:
|
||||
raise Exception("No LoggerABC service registered in ServiceProviderABC")
|
||||
|
||||
self._logger = self._t_logger(self._source, self._file_prefix)
|
||||
|
||||
def set_level(self, level: LogLevel):
|
||||
self._logger.set_level(level)
|
||||
@@ -15,23 +30,68 @@ class WrappedLogger(LoggerABC):
|
||||
def _format_message(self, level: str, timestamp, *messages: Messages) -> str:
|
||||
return self._logger._format_message(level, timestamp, *messages)
|
||||
|
||||
@staticmethod
|
||||
def _get_source() -> str | None:
|
||||
stack = inspect.stack()
|
||||
if len(stack) <= 1:
|
||||
return None
|
||||
|
||||
from cpl.dependency import ServiceCollection
|
||||
|
||||
ignore_classes = [
|
||||
ServiceProviderABC,
|
||||
ServiceProviderABC.__subclasses__(),
|
||||
ServiceCollection,
|
||||
WrappedLogger,
|
||||
WrappedLogger.__subclasses__(),
|
||||
]
|
||||
|
||||
ignore_modules = [x.__module__ for x in ignore_classes if isinstance(x, type)]
|
||||
|
||||
for i, frame_info in enumerate(stack[1:]):
|
||||
module = inspect.getmodule(frame_info.frame)
|
||||
if module is None:
|
||||
continue
|
||||
|
||||
if module.__name__ in ignore_classes or module in ignore_classes:
|
||||
continue
|
||||
|
||||
if module in ignore_modules or module.__name__ in ignore_modules:
|
||||
continue
|
||||
|
||||
if module.__name__ != __name__:
|
||||
return module.__name__
|
||||
|
||||
return None
|
||||
|
||||
def _set_source(self):
|
||||
self._source = self._get_source()
|
||||
self._set_logger()
|
||||
|
||||
def header(self, string: str):
|
||||
self._set_source()
|
||||
self._logger.header(string)
|
||||
|
||||
def trace(self, *messages: Messages):
|
||||
self._set_source()
|
||||
self._logger.trace(*messages)
|
||||
|
||||
def debug(self, *messages: Messages):
|
||||
self._set_source()
|
||||
self._logger.debug(*messages)
|
||||
|
||||
def info(self, *messages: Messages):
|
||||
self._set_source()
|
||||
self._logger.info(*messages)
|
||||
|
||||
def warning(self, *messages: Messages):
|
||||
self._set_source()
|
||||
self._logger.warning(*messages)
|
||||
|
||||
def error(self, messages: str, e: Exception = None):
|
||||
self._set_source()
|
||||
self._logger.error(messages, e)
|
||||
|
||||
def fatal(self, messages: str, e: Exception = None):
|
||||
self._set_source()
|
||||
self._logger.fatal(messages, e)
|
||||
|
||||
Reference in New Issue
Block a user