[WIP] Added native mysql support

This commit is contained in:
2021-11-29 20:55:43 +01:00
parent 7616cd4c69
commit d9f10edbb7
22 changed files with 277 additions and 231 deletions

View File

@@ -1,59 +1,40 @@
from typing import Optional
from sqlalchemy import engine, create_engine
from sqlalchemy.orm import Session, sessionmaker
import mysql.connector as sql
from cpl_core.console.console import Console
from cpl_core.console.foreground_color_enum import ForegroundColorEnum
from cpl_core.database.connection.database_connection_abc import DatabaseConnectionABC
from cpl_core.database.connection.database_connection_abc import \
DatabaseConnectionABC
from cpl_core.database.database_settings import DatabaseSettings
from cpl_core.utils import CredentialManager
from mysql.connector.abstracts import MySQLConnectionAbstract
class DatabaseConnection(DatabaseConnectionABC):
r"""Representation of the database connection
Parameter
---------
database_settings: :class:`cpl_core.database.database_settings.DatabaseSettings`
"""
def __init__(self, database_settings: DatabaseSettings):
def __init__(self):
DatabaseConnectionABC.__init__(self)
self._db_settings = database_settings
self._engine: Optional[engine] = None
self._session: Optional[Session] = None
self._credentials: Optional[str] = None
self._database_server: Optional[MySQLConnectionAbstract] = None
@property
def engine(self) -> engine:
return self._engine
def server(self) -> MySQLConnectionAbstract:
return self._database_server
@property
def session(self) -> Session:
return self._session
def connect(self, connection_string: str):
def connect(self, database_settings: DatabaseSettings):
try:
self._engine = create_engine(connection_string)
if self._db_settings.auth_plugin is not None:
self._engine = create_engine(connection_string, connect_args={'auth_plugin': self._db_settings.auth_plugin})
if self._db_settings.encoding is not None:
self._engine.encoding = self._db_settings.encoding
if self._db_settings.case_sensitive is not None:
self._engine.case_sensitive = self._db_settings.case_sensitive
if self._db_settings.echo is not None:
self._engine.echo = self._db_settings.echo
self._engine.connect()
db_session = sessionmaker(bind=self._engine)
self._session = db_session()
self._database_server = sql.connect(
host=database_settings.host,
user=database_settings.user,
passwd=CredentialManager.decrypt(database_settings.password),
db=database_settings.database,
charset=database_settings.charset,
use_unicode=database_settings.use_unicode,
buffered=database_settings.buffered,
auth_plugin=database_settings.auth_plugin
)
Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f'[{__name__}] Connected to database')
Console.set_foreground_color(ForegroundColorEnum.default)

View File

@@ -1,7 +1,7 @@
from abc import abstractmethod, ABC
from abc import ABC, abstractmethod
from sqlalchemy import engine
from sqlalchemy.orm import Session
from cpl_core.database.database_settings import DatabaseSettings
from mysql.connector.abstracts import MySQLConnectionAbstract
class DatabaseConnectionABC(ABC):
@@ -12,14 +12,10 @@ class DatabaseConnectionABC(ABC):
@property
@abstractmethod
def engine(self) -> engine: pass
@property
def server(self) -> MySQLConnectionAbstract: pass
@abstractmethod
def session(self) -> Session: pass
@abstractmethod
def connect(self, connection_string: str):
def connect(self, database_settings: DatabaseSettings):
r"""Connects to a database by connection string
Parameter