Middleware updated & Fixed mysql pool
All checks were successful
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 18s
Build on push / dependency (push) Successful in 17s
Build on push / database (push) Successful in 15s
Build on push / translation (push) Successful in 18s
Build on push / mail (push) Successful in 19s
Build on push / application (push) Successful in 21s
Build on push / auth (push) Successful in 14s
Build on push / api (push) Successful in 14s

This commit is contained in:
2025-09-21 23:41:25 +02:00
parent ea3055527c
commit 6de4f3c03a
20 changed files with 202 additions and 126 deletions

View File

@@ -1,3 +1,4 @@
import os
from typing import Type
from cpl.application.abc import ApplicationABC as _ApplicationABC
@@ -7,13 +8,19 @@ from . import postgres as _postgres
from .table_manager import TableManager
def _with_migrations(self: _ApplicationABC, *paths: list[str]) -> _ApplicationABC:
def _with_migrations(self: _ApplicationABC, *paths: str | list[str]) -> _ApplicationABC:
from cpl.application.host import Host
from cpl.database.service.migration_service import MigrationService
migration_service = self._services.get_service(MigrationService)
migration_service.with_directory("./scripts")
migration_service.with_directory(os.path.join(os.path.dirname(os.path.abspath(__file__)), "scripts"))
if isinstance(paths, str):
paths = [paths]
for path in paths:
migration_service.with_directory(path)
Host.run(migration_service.migrate)
return self

View File

@@ -156,13 +156,16 @@ class DataAccessObjectABC(ABC, Generic[T_DBM]):
:param dict result: Result from the database
:return:
"""
value_map: dict[str, T] = {}
value_map: dict[str, Any] = {}
db_names = self.__db_names.items()
for db_name, value in result.items():
# Find the attribute name corresponding to the db_name
attr_name = next((k for k, v in self.__db_names.items() if v == db_name), None)
if attr_name:
value_map[attr_name] = self._get_value_from_sql(self.__attributes[attr_name], value)
attr_name = next((k for k, v in db_names if v == db_name), None)
if not attr_name:
continue
value_map[attr_name] = self._get_value_from_sql(self.__attributes[attr_name], value)
return self._model_type(**value_map)

View File

@@ -1,7 +1,7 @@
from typing import Optional, Any
import sqlparse
import aiomysql
from mysql.connector.aio import MySQLConnectionPool
from cpl.core.environment import Environment
from cpl.database.db_logger import DBLogger
@@ -9,97 +9,83 @@ from cpl.database.model import DatabaseSettings
_logger = DBLogger(__name__)
class MySQLPool:
"""
Create a pool when connecting to MySQL, which will decrease the time spent in
requesting connection, creating connection, and closing connection.
"""
def __init__(self, database_settings: DatabaseSettings):
self._db_settings = database_settings
self.pool: Optional[aiomysql.Pool] = None
self._dbconfig = {
"host": database_settings.host,
"port": database_settings.port,
"user": database_settings.user,
"password": database_settings.password,
"database": database_settings.database,
"ssl_disabled": True,
}
self._pool: Optional[MySQLConnectionPool] = None
async def _get_pool(self):
if self.pool is None or self.pool._closed:
if self._pool is None:
self._pool = MySQLConnectionPool(
pool_name="mypool", pool_size=Environment.get("DB_POOL_SIZE", int, 1), **self._dbconfig
)
await self._pool.initialize_pool()
con = await self._pool.get_connection()
try:
self.pool = await aiomysql.create_pool(
host=self._db_settings.host,
port=self._db_settings.port,
user=self._db_settings.user,
password=self._db_settings.password,
db=self._db_settings.database,
minsize=1,
maxsize=Environment.get("DB_POOL_SIZE", int, 1),
)
async with await con.cursor() as cursor:
await cursor.execute("SELECT 1")
await cursor.fetchall()
except Exception as e:
_logger.fatal("Failed to connect to the database", e)
raise
return self.pool
_logger.fatal(f"Error connecting to the database: {e}")
finally:
await con.close()
return self._pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
result = []
if multi:
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
for q in queries:
if q.strip() == "":
continue
await cursor.execute(q, args)
if cursor.description is not None:
result = await cursor.fetchall()
else:
await cursor.execute(query, args)
if cursor.description is not None:
result = await cursor.fetchall()
return result
async def execute(self, query: str, args=None, multi=True) -> list[list]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
con = await pool.get_connection()
try:
async with await con.cursor() as cursor:
result = await self._exec_sql(cursor, query, args, multi)
await con.commit()
if cursor.description is not None: # Query returns rows
res = await cursor.fetchall()
if res is None:
return []
return [list(row) for row in res]
else:
return []
return result
finally:
await con.close()
async def select(self, query: str, args=None, multi=True) -> list[str]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor() as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
con = await pool.get_connection()
try:
async with await con.cursor() as cursor:
res = await self._exec_sql(cursor, query, args, multi)
return list(res)
finally:
await con.close()
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
"""
Execute a SQL statement, it could be with args and without args. The usage is
similar to the execute() function in aiomysql.
:param query: SQL clause
:param args: args needed by the SQL clause
:param multi: if the query is a multi-statement
:return: return result
"""
pool = await self._get_pool()
async with pool.acquire() as con:
async with con.cursor(aiomysql.DictCursor) as cursor:
await self._exec_sql(cursor, query, args, multi)
res = await cursor.fetchall()
con = await pool.get_connection()
try:
async with await con.cursor(dictionary=True) as cursor:
res = await self._exec_sql(cursor, query, args, multi)
return list(res)
finally:
await con.close()

View File

@@ -25,21 +25,23 @@ class PostgresPool:
f"password={database_settings.password} "
f"dbname={database_settings.database}"
)
self.pool: Optional[AsyncConnectionPool] = None
self._pool: Optional[AsyncConnectionPool] = None
async def _get_pool(self):
pool = AsyncConnectionPool(
conninfo=self._conninfo, open=False, min_size=1, max_size=Environment.get("DB_POOL_SIZE", int, 1)
)
await pool.open()
try:
async with pool.connection() as con:
await pool.check_connection(con)
except PoolTimeout as e:
await pool.close()
_logger.fatal(f"Failed to connect to the database", e)
return pool
if self._pool is None:
pool = AsyncConnectionPool(
conninfo=self._conninfo, open=False, min_size=1, max_size=Environment.get("DB_POOL_SIZE", int, 1)
)
await pool.open()
try:
async with pool.connection() as con:
await pool.check_connection(con)
except PoolTimeout as e:
await pool.close()
_logger.fatal(f"Failed to connect to the database", e)
self._pool = pool
return self._pool
@staticmethod
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):