Added structured and wrapped logger #187
All checks were successful
Test before pr merge / test-lint (pull_request) Successful in 5s
All checks were successful
Test before pr merge / test-lint (pull_request) Successful in 5s
This commit is contained in:
@@ -18,16 +18,12 @@ from cpl.database.typing import T_DBM, Attribute, AttributeFilters, AttributeSor
|
||||
class DataAccessObjectABC(ABC, Generic[T_DBM]):
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
|
||||
def __init__(self, model_type: Type[T_DBM], table_name: str):
|
||||
from cpl.dependency.service_provider_abc import ServiceProviderABC
|
||||
|
||||
self._db = ServiceProviderABC.get_global_service(DBContextABC)
|
||||
|
||||
self._logger = DBLogger(source)
|
||||
self._model_type = model_type
|
||||
self._table_name = table_name
|
||||
|
||||
self._logger = DBLogger(source)
|
||||
self._logger = ServiceProviderABC.get_global_service(DBLogger)
|
||||
self._model_type = model_type
|
||||
self._table_name = table_name
|
||||
|
||||
|
||||
@@ -10,8 +10,8 @@ from cpl.database.abc.db_model_abc import DbModelABC
|
||||
class DbModelDaoABC[T_DBM](DataAccessObjectABC[T_DBM]):
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, source: str, model_type: Type[T_DBM], table_name: str):
|
||||
DataAccessObjectABC.__init__(self, source, model_type, table_name)
|
||||
def __init__(self, model_type: Type[T_DBM], table_name: str):
|
||||
DataAccessObjectABC.__init__(self, model_type, table_name)
|
||||
|
||||
self.attribute(DbModelABC.id, int, ignore=True)
|
||||
self.attribute(DbModelABC.deleted, bool)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from cpl.core.log import Logger
|
||||
from cpl.core.typing import Source
|
||||
from cpl.core.log import LoggerABC
|
||||
from cpl.core.log.wrapped_logger import WrappedLogger
|
||||
|
||||
|
||||
class DBLogger(Logger):
|
||||
class DBLogger(WrappedLogger):
|
||||
|
||||
def __init__(self, source: Source):
|
||||
Logger.__init__(self, source, "db")
|
||||
def __init__(self, logger: LoggerABC):
|
||||
WrappedLogger.__init__(self, logger)
|
||||
|
||||
@@ -4,18 +4,17 @@ from typing import Any, List, Dict, Tuple, Union
|
||||
from mysql.connector import Error as MySQLError, PoolError
|
||||
|
||||
from cpl.core.configuration import Configuration
|
||||
from cpl.core.environment import Environment
|
||||
from cpl.database.abc.db_context_abc import DBContextABC
|
||||
from cpl.database.db_logger import DBLogger
|
||||
from cpl.database.model.database_settings import DatabaseSettings
|
||||
from cpl.database.mysql.mysql_pool import MySQLPool
|
||||
|
||||
_logger = DBLogger(__name__)
|
||||
|
||||
|
||||
class DBContext(DBContextABC):
|
||||
def __init__(self):
|
||||
def __init__(self, logger: DBLogger):
|
||||
DBContextABC.__init__(self)
|
||||
self._logger = logger
|
||||
|
||||
self._pool: MySQLPool = None
|
||||
self._fails = 0
|
||||
|
||||
@@ -23,62 +22,62 @@ class DBContext(DBContextABC):
|
||||
|
||||
def connect(self, database_settings: DatabaseSettings):
|
||||
try:
|
||||
_logger.debug("Connecting to database")
|
||||
self._logger.debug("Connecting to database")
|
||||
self._pool = MySQLPool(
|
||||
database_settings,
|
||||
)
|
||||
_logger.info("Connected to database")
|
||||
self._logger.info("Connected to database")
|
||||
except Exception as e:
|
||||
_logger.fatal("Connecting to database failed", e)
|
||||
self._logger.fatal("Connecting to database failed", e)
|
||||
|
||||
async def execute(self, statement: str, args=None, multi=True) -> List[List]:
|
||||
_logger.trace(f"execute {statement} with args: {args}")
|
||||
self._logger.trace(f"execute {statement} with args: {args}")
|
||||
return await self._pool.execute(statement, args, multi)
|
||||
|
||||
async def select_map(self, statement: str, args=None) -> List[Dict]:
|
||||
_logger.trace(f"select {statement} with args: {args}")
|
||||
self._logger.trace(f"select {statement} with args: {args}")
|
||||
try:
|
||||
return await self._pool.select_map(statement, args)
|
||||
except (MySQLError, PoolError) as e:
|
||||
if self._fails >= 3:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
uid = uuid.uuid4()
|
||||
raise Exception(
|
||||
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
|
||||
)
|
||||
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._fails += 1
|
||||
try:
|
||||
_logger.debug("Retry select")
|
||||
self._logger.debug("Retry select")
|
||||
return await self.select_map(statement, args)
|
||||
except Exception as e:
|
||||
pass
|
||||
return []
|
||||
except Exception as e:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
raise e
|
||||
|
||||
async def select(self, statement: str, args=None) -> Union[List[str], List[Tuple], List[Any]]:
|
||||
_logger.trace(f"select {statement} with args: {args}")
|
||||
self._logger.trace(f"select {statement} with args: {args}")
|
||||
try:
|
||||
return await self._pool.select(statement, args)
|
||||
except (MySQLError, PoolError) as e:
|
||||
if self._fails >= 3:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
uid = uuid.uuid4()
|
||||
raise Exception(
|
||||
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
|
||||
)
|
||||
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._fails += 1
|
||||
try:
|
||||
_logger.debug("Retry select")
|
||||
self._logger.debug("Retry select")
|
||||
return await self.select(statement, args)
|
||||
except Exception as e:
|
||||
pass
|
||||
return []
|
||||
except Exception as e:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
raise e
|
||||
|
||||
@@ -6,8 +6,7 @@ from mysql.connector.aio import MySQLConnectionPool
|
||||
from cpl.core.environment import Environment
|
||||
from cpl.database.db_logger import DBLogger
|
||||
from cpl.database.model import DatabaseSettings
|
||||
|
||||
_logger = DBLogger(__name__)
|
||||
from cpl.dependency import ServiceProviderABC
|
||||
|
||||
|
||||
class MySQLPool:
|
||||
@@ -36,7 +35,8 @@ class MySQLPool:
|
||||
await cursor.execute("SELECT 1")
|
||||
await cursor.fetchall()
|
||||
except Exception as e:
|
||||
_logger.fatal(f"Error connecting to the database: {e}")
|
||||
logger = ServiceProviderABC.get_global_service(DBLogger)
|
||||
logger.fatal(f"Error connecting to the database: {e}")
|
||||
finally:
|
||||
await con.close()
|
||||
|
||||
|
||||
@@ -7,16 +7,16 @@ from psycopg_pool import PoolTimeout
|
||||
from cpl.core.configuration import Configuration
|
||||
from cpl.core.environment import Environment
|
||||
from cpl.database.abc.db_context_abc import DBContextABC
|
||||
from cpl.database.database_settings import DatabaseSettings
|
||||
from cpl.database.db_logger import DBLogger
|
||||
from cpl.database.model import DatabaseSettings
|
||||
from cpl.database.postgres.postgres_pool import PostgresPool
|
||||
|
||||
_logger = DBLogger(__name__)
|
||||
|
||||
|
||||
class DBContext(DBContextABC):
|
||||
def __init__(self):
|
||||
def __init__(self, logger: DBLogger):
|
||||
DBContextABC.__init__(self)
|
||||
|
||||
self._logger = logger
|
||||
self._pool: PostgresPool = None
|
||||
self._fails = 0
|
||||
|
||||
@@ -24,63 +24,63 @@ class DBContext(DBContextABC):
|
||||
|
||||
def connect(self, database_settings: DatabaseSettings):
|
||||
try:
|
||||
_logger.debug("Connecting to database")
|
||||
self._logger.debug("Connecting to database")
|
||||
self._pool = PostgresPool(
|
||||
database_settings,
|
||||
Environment.get("DB_POOL_SIZE", int, 1),
|
||||
)
|
||||
_logger.info("Connected to database")
|
||||
self._logger.info("Connected to database")
|
||||
except Exception as e:
|
||||
_logger.fatal("Connecting to database failed", e)
|
||||
self._logger.fatal("Connecting to database failed", e)
|
||||
|
||||
async def execute(self, statement: str, args=None, multi=True) -> list[list]:
|
||||
_logger.trace(f"execute {statement} with args: {args}")
|
||||
self._logger.trace(f"execute {statement} with args: {args}")
|
||||
return await self._pool.execute(statement, args, multi)
|
||||
|
||||
async def select_map(self, statement: str, args=None) -> list[dict]:
|
||||
_logger.trace(f"select {statement} with args: {args}")
|
||||
self._logger.trace(f"select {statement} with args: {args}")
|
||||
try:
|
||||
return await self._pool.select_map(statement, args)
|
||||
except (OperationalError, PoolTimeout) as e:
|
||||
if self._fails >= 3:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
uid = uuid.uuid4()
|
||||
raise Exception(
|
||||
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
|
||||
)
|
||||
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._fails += 1
|
||||
try:
|
||||
_logger.debug("Retry select")
|
||||
self._logger.debug("Retry select")
|
||||
return await self.select_map(statement, args)
|
||||
except Exception as e:
|
||||
pass
|
||||
return []
|
||||
except Exception as e:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
raise e
|
||||
|
||||
async def select(self, statement: str, args=None) -> list[str] | list[tuple] | list[Any]:
|
||||
_logger.trace(f"select {statement} with args: {args}")
|
||||
self._logger.trace(f"select {statement} with args: {args}")
|
||||
try:
|
||||
return await self._pool.select(statement, args)
|
||||
except (OperationalError, PoolTimeout) as e:
|
||||
if self._fails >= 3:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
uid = uuid.uuid4()
|
||||
raise Exception(
|
||||
f"Query failed three times with {type(e).__name__}. Contact an admin with the UID: {uid}"
|
||||
)
|
||||
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._fails += 1
|
||||
try:
|
||||
_logger.debug("Retry select")
|
||||
self._logger.debug("Retry select")
|
||||
return await self.select(statement, args)
|
||||
except Exception as e:
|
||||
pass
|
||||
return []
|
||||
except Exception as e:
|
||||
_logger.error(f"Database error caused by `{statement}`", e)
|
||||
self._logger.error(f"Database error caused by `{statement}`", e)
|
||||
raise e
|
||||
|
||||
@@ -7,8 +7,7 @@ from psycopg_pool import AsyncConnectionPool, PoolTimeout
|
||||
from cpl.core.environment import Environment
|
||||
from cpl.database.db_logger import DBLogger
|
||||
from cpl.database.model import DatabaseSettings
|
||||
|
||||
_logger = DBLogger(__name__)
|
||||
from cpl.dependency import ServiceProviderABC
|
||||
|
||||
|
||||
class PostgresPool:
|
||||
@@ -38,7 +37,8 @@ class PostgresPool:
|
||||
await pool.check_connection(con)
|
||||
except PoolTimeout as e:
|
||||
await pool.close()
|
||||
_logger.fatal(f"Failed to connect to the database", e)
|
||||
logger = ServiceProviderABC.get_global_service(DBLogger)
|
||||
logger.fatal(f"Failed to connect to the database", e)
|
||||
self._pool = pool
|
||||
|
||||
return self._pool
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
from cpl.database import TableManager
|
||||
from cpl.database.abc.data_access_object_abc import DataAccessObjectABC
|
||||
from cpl.database.db_logger import DBLogger
|
||||
from cpl.database.schema.executed_migration import ExecutedMigration
|
||||
|
||||
_logger = DBLogger(__name__)
|
||||
|
||||
|
||||
class ExecutedMigrationDao(DataAccessObjectABC[ExecutedMigration]):
|
||||
|
||||
def __init__(self):
|
||||
DataAccessObjectABC.__init__(self, __name__, ExecutedMigration, TableManager.get("executed_migrations"))
|
||||
DataAccessObjectABC.__init__(self, ExecutedMigration, TableManager.get("executed_migrations"))
|
||||
|
||||
self.attribute(ExecutedMigration.migration_id, str, primary_key=True, db_name="migrationId")
|
||||
|
||||
@@ -8,12 +8,11 @@ from cpl.database.model.server_type import ServerType, ServerTypes
|
||||
from cpl.database.schema.executed_migration import ExecutedMigration
|
||||
from cpl.database.schema.executed_migration_dao import ExecutedMigrationDao
|
||||
|
||||
_logger = DBLogger(__name__)
|
||||
|
||||
|
||||
class MigrationService:
|
||||
|
||||
def __init__(self, db: DBContextABC, executedMigrationDao: ExecutedMigrationDao):
|
||||
def __init__(self, logger: DBLogger, db: DBContextABC, executedMigrationDao: ExecutedMigrationDao):
|
||||
self._logger = logger
|
||||
self._db = db
|
||||
self._executedMigrationDao = executedMigrationDao
|
||||
|
||||
@@ -96,13 +95,13 @@ class MigrationService:
|
||||
if migration_from_db is not None:
|
||||
continue
|
||||
|
||||
_logger.debug(f"Running upgrade migration: {migration.name}")
|
||||
self._logger.debug(f"Running upgrade migration: {migration.name}")
|
||||
|
||||
await self._db.execute(migration.script, multi=True)
|
||||
|
||||
await self._executedMigrationDao.create(ExecutedMigration(migration.name), skip_editor=True)
|
||||
except Exception as e:
|
||||
_logger.fatal(
|
||||
self._logger.fatal(
|
||||
f"Migration failed: {migration.name}\n{active_statement}",
|
||||
e,
|
||||
)
|
||||
|
||||
@@ -3,16 +3,14 @@ from cpl.database.db_logger import DBLogger
|
||||
from cpl.dependency import ServiceProviderABC
|
||||
|
||||
|
||||
_logger = DBLogger(__name__)
|
||||
|
||||
|
||||
class SeederService:
|
||||
|
||||
def __init__(self, provider: ServiceProviderABC):
|
||||
self._provider = provider
|
||||
self._logger = provider.get_service(DBLogger)
|
||||
|
||||
async def seed(self):
|
||||
seeders = self._provider.get_services(DataSeederABC)
|
||||
_logger.debug(f"Found {len(seeders)} seeders")
|
||||
self._logger.debug(f"Found {len(seeders)} seeders")
|
||||
for seeder in seeders:
|
||||
await seeder.seed()
|
||||
|
||||
Reference in New Issue
Block a user