Compare commits
No commits in common. "df316fd53b2b679f6fb6c49956e4b5ef6a97b8dc" and "9353fa749c71ed5cd65b9061888d7b9703c39615" have entirely different histories.
df316fd53b
...
9353fa749c
@ -1,50 +0,0 @@
|
|||||||
from typing import Optional
|
|
||||||
|
|
||||||
from cpl_core.database import DatabaseSettings
|
|
||||||
from cpl_core.database.connection import DatabaseConnectionABC
|
|
||||||
from mysql.connector.abstracts import MySQLConnectionAbstract
|
|
||||||
from mysql.connector.cursor import MySQLCursorBuffered
|
|
||||||
|
|
||||||
|
|
||||||
class DBConnection(DatabaseConnectionABC):
|
|
||||||
def __init__(self):
|
|
||||||
DatabaseConnectionABC.__init__(self)
|
|
||||||
|
|
||||||
self._database: Optional[MySQLConnectionAbstract] = None
|
|
||||||
self._cursor: Optional[MySQLCursorBuffered] = None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def server(self) -> MySQLConnectionAbstract:
|
|
||||||
return self._database
|
|
||||||
|
|
||||||
@property
|
|
||||||
def cursor(self) -> MySQLCursorBuffered:
|
|
||||||
return self._cursor
|
|
||||||
|
|
||||||
def connect(self, settings: DatabaseSettings):
|
|
||||||
# connection = sql.connect(
|
|
||||||
# host=settings.host,
|
|
||||||
# port=settings.port,
|
|
||||||
# user=settings.user,
|
|
||||||
# passwd=CredentialManager.decrypt(settings.password),
|
|
||||||
# charset=settings.charset,
|
|
||||||
# use_unicode=settings.use_unicode,
|
|
||||||
# buffered=settings.buffered,
|
|
||||||
# auth_plugin=settings.auth_plugin,
|
|
||||||
# ssl_disabled=settings.ssl_disabled,
|
|
||||||
# )
|
|
||||||
# connection.cursor().execute(f"CREATE DATABASE IF NOT EXISTS `{settings.database}`;")
|
|
||||||
# self._database = sql.connect(
|
|
||||||
# host=settings.host,
|
|
||||||
# port=settings.port,
|
|
||||||
# user=settings.user,
|
|
||||||
# passwd=CredentialManager.decrypt(settings.password),
|
|
||||||
# db=settings.database,
|
|
||||||
# charset=settings.charset,
|
|
||||||
# use_unicode=settings.use_unicode,
|
|
||||||
# buffered=settings.buffered,
|
|
||||||
# auth_plugin=settings.auth_plugin,
|
|
||||||
# ssl_disabled=settings.ssl_disabled,
|
|
||||||
# )
|
|
||||||
self._
|
|
||||||
self._cursor = self._database.cursor()
|
|
@ -1,3 +1,4 @@
|
|||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from cpl_core.database import DatabaseSettings
|
from cpl_core.database import DatabaseSettings
|
||||||
@ -6,7 +7,6 @@ from cpl_core.database.context import DatabaseContext
|
|||||||
from bot_core.exception.service_error_code_enum import ServiceErrorCode
|
from bot_core.exception.service_error_code_enum import ServiceErrorCode
|
||||||
from bot_core.exception.service_exception import ServiceException
|
from bot_core.exception.service_exception import ServiceException
|
||||||
from bot_core.logging.database_logger import DatabaseLogger
|
from bot_core.logging.database_logger import DatabaseLogger
|
||||||
from bot_data.mysql_pool import MySQLPool
|
|
||||||
|
|
||||||
|
|
||||||
class DBContext(DatabaseContext):
|
class DBContext(DatabaseContext):
|
||||||
@ -14,28 +14,27 @@ class DBContext(DatabaseContext):
|
|||||||
self._logger = logger
|
self._logger = logger
|
||||||
|
|
||||||
DatabaseContext.__init__(self)
|
DatabaseContext.__init__(self)
|
||||||
self._pool: MySQLPool = None
|
|
||||||
self._fails = 0
|
self._fails = 0
|
||||||
|
|
||||||
def connect(self, database_settings: DatabaseSettings):
|
def connect(self, database_settings: DatabaseSettings):
|
||||||
try:
|
try:
|
||||||
self._logger.debug(__name__, "Connecting to database")
|
self._logger.debug(__name__, "Connecting to database")
|
||||||
self._pool = MySQLPool(database_settings)
|
self._db.connect(database_settings)
|
||||||
self._pool.execute(f"CREATE DATABASE IF NOT EXISTS `{database_settings.database}`;", commit=True)
|
|
||||||
self._logger.info(__name__, "Connected to database")
|
self._logger.info(__name__, "Connected to database")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._logger.fatal(__name__, "Connecting to database failed", e)
|
self._logger.fatal(__name__, "Connecting to database failed", e)
|
||||||
|
|
||||||
@property
|
|
||||||
def cursor(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def save_changes(self):
|
def save_changes(self):
|
||||||
pass
|
try:
|
||||||
|
self._logger.trace(__name__, "Save changes")
|
||||||
|
super(DBContext, self).save_changes()
|
||||||
|
self._logger.debug(__name__, "Saved changes")
|
||||||
|
except Exception as e:
|
||||||
|
self._logger.error(__name__, "Saving changes failed", e)
|
||||||
|
|
||||||
def select(self, statement: str) -> list[tuple]:
|
def select(self, statement: str) -> list[tuple]:
|
||||||
try:
|
try:
|
||||||
return self._pool.execute(statement)
|
return super(DBContext, self).select(statement)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if self._fails >= 3:
|
if self._fails >= 3:
|
||||||
self._logger.error(__name__, f"Database error caused by {statement}", e)
|
self._logger.error(__name__, f"Database error caused by {statement}", e)
|
||||||
@ -48,11 +47,9 @@ class DBContext(DatabaseContext):
|
|||||||
self._logger.error(__name__, f"Database error caused by {statement}", e)
|
self._logger.error(__name__, f"Database error caused by {statement}", e)
|
||||||
self._fails += 1
|
self._fails += 1
|
||||||
try:
|
try:
|
||||||
|
time.sleep(0.5)
|
||||||
self._logger.debug(__name__, "Retry select")
|
self._logger.debug(__name__, "Retry select")
|
||||||
return self.select(statement)
|
return self.select(statement)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
pass
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def execute(self, statement: str):
|
|
||||||
return self._pool.execute(statement, commit=True)
|
|
||||||
|
@ -1,104 +0,0 @@
|
|||||||
# https://stackoverflow.com/questions/32658679/how-to-create-a-mysql-connection-pool-or-any-better-way-to-initialize-the-multip
|
|
||||||
import mysql.connector as sql
|
|
||||||
from cpl_core.database import DatabaseSettings
|
|
||||||
from cpl_core.utils import CredentialManager
|
|
||||||
|
|
||||||
|
|
||||||
class MySQLPool(object):
|
|
||||||
"""
|
|
||||||
create a pool when connect mysql, which will decrease the time spent in
|
|
||||||
request connection, create connection and close connection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
database_settings: DatabaseSettings,
|
|
||||||
pool_size=5,
|
|
||||||
):
|
|
||||||
res = {
|
|
||||||
"host": database_settings.host,
|
|
||||||
"port": database_settings.port,
|
|
||||||
"user": database_settings.user,
|
|
||||||
"password": CredentialManager.decrypt(database_settings.password),
|
|
||||||
"database": database_settings.database,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.dbconfig = res
|
|
||||||
self.pool = self.create_pool(pool_name="MySqlPool", pool_size=pool_size)
|
|
||||||
|
|
||||||
def create_pool(self, pool_name="MySqlPool", pool_size=3):
|
|
||||||
"""
|
|
||||||
Create a connection pool, after created, the request of connecting
|
|
||||||
MySQL could get a connection from this pool instead of request to
|
|
||||||
create a connection.
|
|
||||||
:param pool_name: the name of pool, default is "mypool"
|
|
||||||
:param pool_size: the size of pool, default is 3
|
|
||||||
:return: connection pool
|
|
||||||
"""
|
|
||||||
pool = sql.pooling.MySQLConnectionPool(
|
|
||||||
pool_name=pool_name, pool_size=pool_size, pool_reset_session=True, **self.dbconfig
|
|
||||||
)
|
|
||||||
return pool
|
|
||||||
|
|
||||||
def close(self, conn, cursor):
|
|
||||||
"""
|
|
||||||
A method used to close connection of mysql.
|
|
||||||
:param conn:
|
|
||||||
:param cursor:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
cursor.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
def execute(self, sql, args=None, commit=False):
|
|
||||||
"""
|
|
||||||
Execute a sql, it could be with args and with out args. The usage is
|
|
||||||
similar with execute() function in module pymysql.
|
|
||||||
:param sql: sql clause
|
|
||||||
:param args: args need by sql clause
|
|
||||||
:param commit: whether to commit
|
|
||||||
:return: if commit, return None, else, return result
|
|
||||||
"""
|
|
||||||
# get connection form connection pool instead of create one.
|
|
||||||
conn = self.pool.get_connection()
|
|
||||||
cursor = conn.cursor()
|
|
||||||
if args:
|
|
||||||
cursor.execute(sql, args)
|
|
||||||
else:
|
|
||||||
cursor.execute(sql)
|
|
||||||
if commit is True:
|
|
||||||
conn.commit()
|
|
||||||
self.close(conn, cursor)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
res = cursor.fetchall()
|
|
||||||
self.close(conn, cursor)
|
|
||||||
return res
|
|
||||||
|
|
||||||
def executemany(self, sql, args, commit=False):
|
|
||||||
"""
|
|
||||||
Execute with many args. Similar with executemany() function in pymysql.
|
|
||||||
args should be a sequence.
|
|
||||||
:param sql: sql clause
|
|
||||||
:param args: args
|
|
||||||
:param commit: commit or not.
|
|
||||||
:return: if commit, return None, else, return result
|
|
||||||
"""
|
|
||||||
# get connection form connection pool instead of create one.
|
|
||||||
conn = self.pool.get_connection()
|
|
||||||
cursor = conn.cursor()
|
|
||||||
cursor.executemany(sql, args)
|
|
||||||
if commit is True:
|
|
||||||
conn.commit()
|
|
||||||
self.close(conn, cursor)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
res = cursor.fetchall()
|
|
||||||
self.close(conn, cursor)
|
|
||||||
return res
|
|
||||||
|
|
||||||
def commit(self):
|
|
||||||
conn = self.pool.get_connection()
|
|
||||||
conn.commit()
|
|
||||||
cursor = conn.cursor()
|
|
||||||
self.close(conn, cursor)
|
|
@ -1,13 +1,13 @@
|
|||||||
import glob
|
import glob
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from cpl_core.database.context import DatabaseContextABC
|
||||||
from cpl_core.dependency_injection import ServiceProviderABC
|
from cpl_core.dependency_injection import ServiceProviderABC
|
||||||
from cpl_query.extension import List
|
from cpl_query.extension import List
|
||||||
from packaging import version
|
from packaging import version
|
||||||
|
|
||||||
import bot
|
import bot
|
||||||
from bot_core.logging.database_logger import DatabaseLogger
|
from bot_core.logging.database_logger import DatabaseLogger
|
||||||
from bot_data.db_context import DBContext
|
|
||||||
from bot_data.model.migration import Migration
|
from bot_data.model.migration import Migration
|
||||||
from bot_data.model.migration_history import MigrationHistory
|
from bot_data.model.migration_history import MigrationHistory
|
||||||
|
|
||||||
@ -17,12 +17,13 @@ class MigrationService:
|
|||||||
self,
|
self,
|
||||||
logger: DatabaseLogger,
|
logger: DatabaseLogger,
|
||||||
services: ServiceProviderABC,
|
services: ServiceProviderABC,
|
||||||
db: DBContext,
|
db: DatabaseContextABC,
|
||||||
):
|
):
|
||||||
self._logger = logger
|
self._logger = logger
|
||||||
self._services = services
|
self._services = services
|
||||||
|
|
||||||
self._db = db
|
self._db = db
|
||||||
|
self._cursor = db.cursor
|
||||||
|
|
||||||
def _get_migration_history(self) -> List[MigrationHistory]:
|
def _get_migration_history(self) -> List[MigrationHistory]:
|
||||||
results = self._db.select(
|
results = self._db.select(
|
||||||
@ -41,7 +42,7 @@ class MigrationService:
|
|||||||
return
|
return
|
||||||
|
|
||||||
self._logger.debug(__name__, f"Migrate new migration {migration.migration_id} to old method")
|
self._logger.debug(__name__, f"Migrate new migration {migration.migration_id} to old method")
|
||||||
self._db.execute(migration.change_id_string(f"{migration.migration_id}Migration"))
|
self._cursor.execute(migration.change_id_string(f"{migration.migration_id}Migration"))
|
||||||
self._db.save_changes()
|
self._db.save_changes()
|
||||||
|
|
||||||
def _migration_migrations_to_new(self, migration: MigrationHistory):
|
def _migration_migrations_to_new(self, migration: MigrationHistory):
|
||||||
@ -49,11 +50,12 @@ class MigrationService:
|
|||||||
return
|
return
|
||||||
|
|
||||||
self._logger.debug(__name__, f"Migrate old migration {migration.migration_id} to new method")
|
self._logger.debug(__name__, f"Migrate old migration {migration.migration_id} to new method")
|
||||||
self._db.execute(migration.change_id_string(migration.migration_id.replace("Migration", "")))
|
self._cursor.execute(migration.change_id_string(migration.migration_id.replace("Migration", "")))
|
||||||
self._db.save_changes()
|
self._db.save_changes()
|
||||||
|
|
||||||
def _migrate_from_old_to_new(self):
|
def _migrate_from_old_to_new(self):
|
||||||
result = self._db.select("SHOW TABLES LIKE 'MigrationHistory'")
|
self._cursor.execute("SHOW TABLES LIKE 'MigrationHistory'")
|
||||||
|
result = self._cursor.fetchone()
|
||||||
if not result:
|
if not result:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -118,7 +120,8 @@ class MigrationService:
|
|||||||
active_statement = ""
|
active_statement = ""
|
||||||
try:
|
try:
|
||||||
# check if table exists
|
# check if table exists
|
||||||
result = self._db.select("SHOW TABLES LIKE 'MigrationHistory'")
|
self._cursor.execute("SHOW TABLES LIKE 'MigrationHistory'")
|
||||||
|
result = self._cursor.fetchone()
|
||||||
if result:
|
if result:
|
||||||
# there is a table named "tableName"
|
# there is a table named "tableName"
|
||||||
self._logger.trace(
|
self._logger.trace(
|
||||||
@ -139,9 +142,9 @@ class MigrationService:
|
|||||||
if statement in ["", "\n"]:
|
if statement in ["", "\n"]:
|
||||||
continue
|
continue
|
||||||
active_statement = statement
|
active_statement = statement
|
||||||
self._db.execute(statement + ";")
|
self._cursor.execute(statement + ";")
|
||||||
|
|
||||||
self._db.execute(
|
self._cursor.execute(
|
||||||
MigrationHistory(migration.name).insert_string
|
MigrationHistory(migration.name).insert_string
|
||||||
if upgrade
|
if upgrade
|
||||||
else MigrationHistory(migration.name).delete_string
|
else MigrationHistory(migration.name).delete_string
|
||||||
|
@ -2,7 +2,6 @@ from datetime import datetime
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from cpl_core.database.context import DatabaseContextABC
|
from cpl_core.database.context import DatabaseContextABC
|
||||||
from cpl_core.time import TimeFormatSettings
|
|
||||||
from cpl_query.extension import List
|
from cpl_query.extension import List
|
||||||
|
|
||||||
from bot_core.logging.database_logger import DatabaseLogger
|
from bot_core.logging.database_logger import DatabaseLogger
|
||||||
@ -16,14 +15,12 @@ from bot_data.model.user_message_count_per_hour import UserMessageCountPerHour
|
|||||||
class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepositoryABC):
|
class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepositoryABC):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
time_format: TimeFormatSettings,
|
|
||||||
logger: DatabaseLogger,
|
logger: DatabaseLogger,
|
||||||
db_context: DatabaseContextABC,
|
db_context: DatabaseContextABC,
|
||||||
users: UserRepositoryABC,
|
users: UserRepositoryABC,
|
||||||
):
|
):
|
||||||
UserMessageCountPerHourRepositoryABC.__init__(self)
|
UserMessageCountPerHourRepositoryABC.__init__(self)
|
||||||
|
|
||||||
self._time_format = time_format
|
|
||||||
self._logger = logger
|
self._logger = logger
|
||||||
self._context = db_context
|
self._context = db_context
|
||||||
self._users = users
|
self._users = users
|
||||||
@ -70,12 +67,7 @@ class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepository
|
|||||||
) -> UserMessageCountPerHour:
|
) -> UserMessageCountPerHour:
|
||||||
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
|
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
|
||||||
self._logger.trace(__name__, f"Send SQL command: {sql}")
|
self._logger.trace(__name__, f"Send SQL command: {sql}")
|
||||||
res = self._context.select(sql)
|
return self._from_result(self._context.select(sql)[0])
|
||||||
if len(res) > 0:
|
|
||||||
return self._from_result(res[0])
|
|
||||||
|
|
||||||
user = self._users.get_user_by_id(user_id)
|
|
||||||
return UserMessageCountPerHour(date.strftime(self._time_format.date_time_format), date.hour, 0, user)
|
|
||||||
|
|
||||||
def find_user_message_count_per_hour_by_user_id_and_date(
|
def find_user_message_count_per_hour_by_user_id_and_date(
|
||||||
self, user_id: int, date: datetime
|
self, user_id: int, date: datetime
|
||||||
|
Loading…
Reference in New Issue
Block a user