Added demo for SQL Support

This commit is contained in:
Sven Heidemann 2021-11-30 11:10:36 +01:00
parent d9f10edbb7
commit b526a07e19
10 changed files with 120 additions and 57 deletions

View File

@ -8,6 +8,7 @@ from cpl_core.database.connection.database_connection_abc import \
from cpl_core.database.database_settings import DatabaseSettings from cpl_core.database.database_settings import DatabaseSettings
from cpl_core.utils import CredentialManager from cpl_core.utils import CredentialManager
from mysql.connector.abstracts import MySQLConnectionAbstract from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseConnection(DatabaseConnectionABC): class DatabaseConnection(DatabaseConnectionABC):
@ -17,15 +18,30 @@ class DatabaseConnection(DatabaseConnectionABC):
def __init__(self): def __init__(self):
DatabaseConnectionABC.__init__(self) DatabaseConnectionABC.__init__(self)
self._database_server: Optional[MySQLConnectionAbstract] = None self._database: Optional[MySQLConnectionAbstract] = None
self._cursor: Optional[MySQLCursorBuffered] = None
@property @property
def server(self) -> MySQLConnectionAbstract: def server(self) -> MySQLConnectionAbstract:
return self._database_server return self._database
@property
def cursor(self) -> MySQLCursorBuffered:
return self._cursor
def connect(self, database_settings: DatabaseSettings): def connect(self, database_settings: DatabaseSettings):
try: try:
self._database_server = sql.connect( connection = sql.connect(
host=database_settings.host,
user=database_settings.user,
passwd=CredentialManager.decrypt(database_settings.password),
charset=database_settings.charset,
use_unicode=database_settings.use_unicode,
buffered=database_settings.buffered,
auth_plugin=database_settings.auth_plugin
)
connection.cursor().execute(f'CREATE DATABASE IF NOT EXISTS `{database_settings.database}`;')
self._database = sql.connect(
host=database_settings.host, host=database_settings.host,
user=database_settings.user, user=database_settings.user,
passwd=CredentialManager.decrypt(database_settings.password), passwd=CredentialManager.decrypt(database_settings.password),
@ -35,6 +51,7 @@ class DatabaseConnection(DatabaseConnectionABC):
buffered=database_settings.buffered, buffered=database_settings.buffered,
auth_plugin=database_settings.auth_plugin auth_plugin=database_settings.auth_plugin
) )
self._cursor = self._database.cursor()
Console.set_foreground_color(ForegroundColorEnum.green) Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f'[{__name__}] Connected to database') Console.write_line(f'[{__name__}] Connected to database')
Console.set_foreground_color(ForegroundColorEnum.default) Console.set_foreground_color(ForegroundColorEnum.default)

View File

@ -2,6 +2,7 @@ from abc import ABC, abstractmethod
from cpl_core.database.database_settings import DatabaseSettings from cpl_core.database.database_settings import DatabaseSettings
from mysql.connector.abstracts import MySQLConnectionAbstract from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseConnectionABC(ABC): class DatabaseConnectionABC(ABC):
@ -14,6 +15,10 @@ class DatabaseConnectionABC(ABC):
@abstractmethod @abstractmethod
def server(self) -> MySQLConnectionAbstract: pass def server(self) -> MySQLConnectionAbstract: pass
@property
@abstractmethod
def cursor(self) -> MySQLCursorBuffered: pass
@abstractmethod @abstractmethod
def connect(self, database_settings: DatabaseSettings): def connect(self, database_settings: DatabaseSettings):
r"""Connects to a database by connection string r"""Connects to a database by connection string

View File

@ -8,6 +8,7 @@ from cpl_core.database.connection.database_connection_abc import \
from cpl_core.database.context.database_context_abc import DatabaseContextABC from cpl_core.database.context.database_context_abc import DatabaseContextABC
from cpl_core.database.database_settings import DatabaseSettings from cpl_core.database.database_settings import DatabaseSettings
from cpl_core.database.table_abc import TableABC from cpl_core.database.table_abc import TableABC
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseContext(DatabaseContextABC): class DatabaseContext(DatabaseContextABC):
@ -22,21 +23,22 @@ class DatabaseContext(DatabaseContextABC):
DatabaseContextABC.__init__(self, database_settings) DatabaseContextABC.__init__(self, database_settings)
self._db: DatabaseConnectionABC = DatabaseConnection() self._db: DatabaseConnectionABC = DatabaseConnection()
self._cursor: Optional[str] = None
self._tables: list[TableABC] = TableABC.__subclasses__() self._tables: list[TableABC] = TableABC.__subclasses__()
@property @property
def cursor(self): def cursor(self) -> MySQLCursorBuffered:
return self._cursor return self._db.cursor
def connect(self, database_settings: DatabaseSettings): def connect(self, database_settings: DatabaseSettings):
self._db.connect(database_settings) self._db.connect(database_settings)
c = self._db.server.cursor()
self._cursor = c
Console.write_line(f"Ts: {self._tables}") Console.write_line(f"Ts: {self._tables}")
for table in self._tables: for table in self._tables:
Console.write_line(f"{table}, {table.create_string}") Console.write_line(f"{table}, {table.get_create_string()}")
c.execute(table.create_string) self._db.cursor.execute(table.get_create_string())
def save_changes(self): def save_changes(self):
self._db.server.commit() self._db.server.commit()
def select(self, statement: str) -> list:
self._db.cursor.execute(statement)
return self._db.cursor.fetchall()

View File

@ -1,6 +1,7 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from cpl_core.database.database_settings import DatabaseSettings from cpl_core.database.database_settings import DatabaseSettings
from mysql.connector.cursor import MySQLCursorBuffered
class DatabaseContextABC(ABC): class DatabaseContextABC(ABC):
@ -11,8 +12,8 @@ class DatabaseContextABC(ABC):
pass pass
@property @property
@abstractmethod def cursor(self) -> MySQLCursorBuffered:
def cursor(self): pass return self._cursor
@abstractmethod @abstractmethod
def connect(self, database_settings: DatabaseSettings): def connect(self, database_settings: DatabaseSettings):
@ -24,6 +25,21 @@ class DatabaseContextABC(ABC):
""" """
pass pass
@abstractmethod
def save_changes(self): def save_changes(self):
r"""Saves changes of the database""" r"""Saves changes of the database"""
pass pass
@abstractmethod
def select(self, statement: str) -> list:
r"""Runs SQL Statements
Parameter
---------
statement: :class:`str`
Returns
-------
list: Fetched list of selected elements
"""
pass

View File

@ -18,9 +18,9 @@ class TableABC(ABC):
def LastModifiedAt(self) -> datetime: def LastModifiedAt(self) -> datetime:
return self._modified_at return self._modified_at
@property @staticmethod
@abstractmethod @abstractmethod
def create_string(self) -> str: pass def get_create_string() -> str: pass
@property @property
@abstractmethod @abstractmethod

View File

@ -26,10 +26,12 @@ class Application(ApplicationABC):
self._logger.debug(__name__, f'Customer: {self._configuration.environment.customer}') self._logger.debug(__name__, f'Customer: {self._configuration.environment.customer}')
user_repo: UserRepo = self._services.get_service(UserRepoABC) user_repo: UserRepo = self._services.get_service(UserRepoABC)
if len(user_repo.get_users()) == 0:
user_repo.add_test_user() user_repo.add_test_user()
Console.write_line('Users:') Console.write_line('Users:')
for user in user_repo.get_users(): for user in user_repo.get_users():
Console.write_line(user.UserId, user.Name, user.CityId, user.City.CityId, user.City.Name, user.City.ZIP) Console.write_line(user.UserId, user.Name, user.City)
Console.write_line('Cities:') Console.write_line('Cities:')
for city in user_repo.get_cities(): for city in user_repo.get_cities():

View File

@ -3,44 +3,45 @@ from cpl_core.database import TableABC
class CityModel(TableABC): class CityModel(TableABC):
def __init__(self, name: str, zip_code: str): def __init__(self, name: str, zip_code: str, id = 0):
self.CityId = 0 self.CityId = id
self.Name = name self.Name = name
self.ZIP = zip_code self.ZIP = zip_code
@property @staticmethod
def create_string(self) -> str: def get_create_string() -> str:
return f""" return str(f"""
CREATE TABLE IF NOT EXISTS `City` ( CREATE TABLE IF NOT EXISTS `City` (
`CityId` INT(30) NOT NULL AUTO_INCREMENT, `CityId` INT(30) NOT NULL AUTO_INCREMENT,
`Name` VARCHAR NOT NULL, `Name` VARCHAR(64) NOT NULL,
`ZIP` VARCHAR NOT NULL, `ZIP` VARCHAR(5) NOT NULL,
PRIMARY KEY(`CityId`)
); );
""" """)
@property @property
def insert_string(self) -> str: def insert_string(self) -> str:
return f""" return str(f"""
INSERT INTO `City` ( INSERT INTO `City` (
`Name`, `ZIP` `Name`, `ZIP`
) VALUES ( ) VALUES (
'{self.Name}', '{self.Name}',
'{self.ZIP}' '{self.ZIP}'
); );
""" """)
@property @property
def udpate_string(self) -> str: def udpate_string(self) -> str:
return f""" return str(f"""
UPDATE `City` UPDATE `City`
SET `Name` = '{self.Name}', SET `Name` = '{self.Name}',
`ZIP` = '{self.ZIP}', `ZIP` = '{self.ZIP}',
WHERE `CityId` = {self.Id}; WHERE `CityId` = {self.Id};
""" """)
@property @property
def delete_string(self) -> str: def delete_string(self) -> str:
return f""" return str(f"""
DELETE FROM `City` DELETE FROM `City`
WHERE `CityId` = {self.Id}; WHERE `CityId` = {self.Id};
""" """)

View File

@ -5,46 +5,46 @@ from .city_model import CityModel
class UserModel(TableABC): class UserModel(TableABC):
def __init__(self, name: str, city: CityModel): def __init__(self, name: str, city: CityModel, id = 0):
self.UserId = 0 self.UserId = id
self.Name = name self.Name = name
self.CityId = city.CityId self.CityId = city.CityId if city is not None else 0
self.City = city self.City = city
@property @staticmethod
def create_string(self) -> str: def get_create_string() -> str:
return f""" return str(f"""
CREATE TABLE IF NOT EXISTS `User` ( CREATE TABLE IF NOT EXISTS `User` (
`UserId` INT(30) NOT NULL AUTO_INCREMENT, `UserId` INT(30) NOT NULL AUTO_INCREMENT,
`Name` VARCHAR NOT NULL, `Name` VARCHAR(64) NOT NULL,
`CityId` VARCHAR NOT NULL, `CityId` INT(30),
FOREIGN KEY (`UserId`) REFERENCES City(`CityId`), FOREIGN KEY (`UserId`) REFERENCES City(`CityId`),
PRIMARY KEY(`UserId`) PRIMARY KEY(`UserId`)
); );
""" """)
@property @property
def insert_string(self) -> str: def insert_string(self) -> str:
return f""" return str(f"""
INSERT INTO `User` ( INSERT INTO `User` (
`Name` `Name`
) VALUES ( ) VALUES (
'{self.Name}' '{self.Name}'
); );
""" """)
@property @property
def udpate_string(self) -> str: def udpate_string(self) -> str:
return f""" return str(f"""
UPDATE `User` UPDATE `User`
SET `Name` = '{self.Name}', SET `Name` = '{self.Name}',
`CityId` = {self.CityId}, `CityId` = {self.CityId},
WHERE `UserId` = {self.UserId}; WHERE `UserId` = {self.UserId};
""" """)
@property @property
def delete_string(self) -> str: def delete_string(self) -> str:
return f""" return str(f"""
DELETE FROM `User` DELETE FROM `User`
WHERE `UserId` = {self.UserId}; WHERE `UserId` = {self.UserId};
""" """)

View File

@ -1,4 +1,6 @@
from cpl_core.console import Console
from cpl_core.database.context import DatabaseContextABC from cpl_core.database.context import DatabaseContextABC
from .city_model import CityModel from .city_model import CityModel
from .user_model import UserModel from .user_model import UserModel
from .user_repo_abc import UserRepoABC from .user_repo_abc import UserRepoABC
@ -9,21 +11,32 @@ class UserRepo(UserRepoABC):
def __init__(self, db_context: DatabaseContextABC): def __init__(self, db_context: DatabaseContextABC):
UserRepoABC.__init__(self) UserRepoABC.__init__(self)
self._db: DatabaseContextABC = db_context self._db_context: DatabaseContextABC = db_context
def create(self): pass
def add_test_user(self): def add_test_user(self):
city = CityModel('Haren', '49733') city = CityModel('Haren', '49733')
city2 = CityModel('Meppen', '49716') city2 = CityModel('Meppen', '49716')
self._db.cursor.execute(city2.insert_string) self._db_context.cursor.execute(city2.insert_string)
user = UserModel('TestUser', city) user = UserModel('TestUser', city)
self._db.cursor.execute(user.insert_string) self._db_context.cursor.execute(user.insert_string)
self._db_context.save_changes()
def get_users(self) -> list[UserModel]: def get_users(self) -> list[UserModel]:
self._db.cursor.execute(f"""SELECT * FROM `User`""") users = []
return self._db.cursor.fetchall() results = self._db_context.select('SELECT * FROM `User`')
for result in results:
users.append(UserModel(result[1], self.get_city_by_id(result[2]), id = result[0]))
return users
def get_cities(self) -> list[CityModel]: def get_cities(self) -> list[CityModel]:
self._db.cursor.execute(f"""SELECT * FROM `City`""") cities = []
return self._db.cursor.fetchall() results = self._db_context.select('SELECT * FROM `City`')
for result in results:
cities.append(CityModel(result[1], result[2], id = result[0]))
return cities
def get_city_by_id(self, id: int) -> CityModel:
if id is None:
return None
result = self._db_context.select(f'SELECT * FROM `City` WHERE `Id` = {id}')
return CityModel(result[1], result[2], id = result[0])

View File

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from .city_model import CityModel
from .user_model import UserModel from .user_model import UserModel
@ -10,3 +11,9 @@ class UserRepoABC(ABC):
@abstractmethod @abstractmethod
def get_users(self) -> list[UserModel]: pass def get_users(self) -> list[UserModel]: pass
@abstractmethod
def get_cities(self) -> list[CityModel]: pass
@abstractmethod
def get_city_by_id(self, id: int) -> CityModel: pass