Recursive complex filtering #181
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
from typing import Optional, Any
|
||||
|
||||
import sqlparse
|
||||
import asyncio
|
||||
|
||||
from mysql.connector import errors, PoolError
|
||||
from mysql.connector.aio import MySQLConnectionPool
|
||||
|
||||
from cpl.core.environment import Environment
|
||||
@@ -10,7 +12,6 @@ from cpl.dependency.context import get_provider
|
||||
|
||||
|
||||
class MySQLPool:
|
||||
|
||||
def __init__(self, database_settings: DatabaseSettings):
|
||||
self._dbconfig = {
|
||||
"host": database_settings.host,
|
||||
@@ -25,59 +26,87 @@ class MySQLPool:
|
||||
"ssl_disabled": database_settings.ssl_disabled,
|
||||
}
|
||||
self._pool: Optional[MySQLConnectionPool] = None
|
||||
self._pool_lock = asyncio.Lock()
|
||||
|
||||
async def _get_pool(self):
|
||||
async def _get_pool(self) -> MySQLConnectionPool:
|
||||
if self._pool is None:
|
||||
try:
|
||||
self._pool = MySQLConnectionPool(
|
||||
pool_name="mypool", pool_size=Environment.get("DB_POOL_SIZE", int, 1), **self._dbconfig
|
||||
)
|
||||
await self._pool.initialize_pool()
|
||||
async with self._pool_lock:
|
||||
if self._pool is None:
|
||||
try:
|
||||
self._pool = MySQLConnectionPool(
|
||||
pool_name="cplpool",
|
||||
pool_size=Environment.get("DB_POOL_SIZE", int, 20),
|
||||
**self._dbconfig,
|
||||
)
|
||||
await self._pool.initialize_pool()
|
||||
|
||||
con = await self._pool.get_connection()
|
||||
async with await con.cursor() as cursor:
|
||||
await cursor.execute("SELECT 1")
|
||||
await cursor.fetchall()
|
||||
|
||||
await con.close()
|
||||
except Exception as e:
|
||||
logger = get_provider().get_service(DBLogger)
|
||||
logger.fatal(f"Error connecting to the database", e)
|
||||
# Testverbindung (Ping)
|
||||
con = await self._pool.get_connection()
|
||||
try:
|
||||
async with await con.cursor() as cursor:
|
||||
await cursor.execute("SELECT 1")
|
||||
await cursor.fetchall()
|
||||
finally:
|
||||
await con.close()
|
||||
|
||||
except Exception as e:
|
||||
logger = get_provider().get_service(DBLogger)
|
||||
logger.fatal("Error connecting to the database", e)
|
||||
raise
|
||||
return self._pool
|
||||
|
||||
async def _get_connection(self, retries: int = 3, delay: float = 0.5):
|
||||
"""Stabiler Connection-Getter mit Retry und Ping"""
|
||||
pool = await self._get_pool()
|
||||
|
||||
for attempt in range(retries):
|
||||
try:
|
||||
con = await pool.get_connection()
|
||||
|
||||
# Verbindungs-Check (Ping)
|
||||
try:
|
||||
async with await con.cursor() as cursor:
|
||||
await cursor.execute("SELECT 1")
|
||||
await cursor.fetchall()
|
||||
except errors.OperationalError:
|
||||
await con.close()
|
||||
raise
|
||||
|
||||
return con
|
||||
|
||||
except PoolError:
|
||||
if attempt == retries - 1:
|
||||
raise
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
@staticmethod
|
||||
async def _exec_sql(cursor: Any, query: str, args=None, multi=True):
|
||||
result = []
|
||||
if multi:
|
||||
queries = [str(stmt).strip() for stmt in sqlparse.parse(query) if str(stmt).strip()]
|
||||
for q in queries:
|
||||
if q.strip() == "":
|
||||
continue
|
||||
await cursor.execute(q, args)
|
||||
if cursor.description is not None:
|
||||
result = await cursor.fetchall()
|
||||
if q:
|
||||
await cursor.execute(q, args)
|
||||
if cursor.description is not None:
|
||||
result = await cursor.fetchall()
|
||||
else:
|
||||
await cursor.execute(query, args)
|
||||
if cursor.description is not None:
|
||||
result = await cursor.fetchall()
|
||||
|
||||
return result
|
||||
|
||||
async def execute(self, query: str, args=None, multi=True) -> list[list]:
|
||||
pool = await self._get_pool()
|
||||
con = await pool.get_connection()
|
||||
async def execute(self, query: str, args=None, multi=True) -> list[str]:
|
||||
con = await self._get_connection()
|
||||
try:
|
||||
async with await con.cursor() as cursor:
|
||||
result = await self._exec_sql(cursor, query, args, multi)
|
||||
res = await self._exec_sql(cursor, query, args, multi)
|
||||
await con.commit()
|
||||
return result
|
||||
return list(res)
|
||||
finally:
|
||||
await con.close()
|
||||
|
||||
async def select(self, query: str, args=None, multi=True) -> list[str]:
|
||||
pool = await self._get_pool()
|
||||
con = await pool.get_connection()
|
||||
con = await self._get_connection()
|
||||
try:
|
||||
async with await con.cursor() as cursor:
|
||||
res = await self._exec_sql(cursor, query, args, multi)
|
||||
@@ -86,8 +115,7 @@ class MySQLPool:
|
||||
await con.close()
|
||||
|
||||
async def select_map(self, query: str, args=None, multi=True) -> list[dict]:
|
||||
pool = await self._get_pool()
|
||||
con = await pool.get_connection()
|
||||
con = await self._get_connection()
|
||||
try:
|
||||
async with await con.cursor(dictionary=True) as cursor:
|
||||
res = await self._exec_sql(cursor, query, args, multi)
|
||||
|
||||
@@ -27,7 +27,7 @@ class PostgresPool:
|
||||
self._pool: Optional[AsyncConnectionPool] = None
|
||||
|
||||
async def _get_pool(self):
|
||||
if self._pool is None:
|
||||
if self._pool is None or self._pool.closed:
|
||||
pool = AsyncConnectionPool(
|
||||
conninfo=self._conninfo, open=False, min_size=1, max_size=Environment.get("DB_POOL_SIZE", int, 1)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user