Vergebene XP für Nachrichten und Reaktionen pro Stunde begrenzen #168 #176

Merged
edraft merged 6 commits from #168 into 0.3.1 2023-01-13 22:47:13 +01:00
4 changed files with 205 additions and 205 deletions
Showing only changes of commit 59fc1e1442 - Show all commits

View File

@ -25,8 +25,8 @@ from bot_data.service.seeder_service import SeederService
from bot_data.service.server_repository_service import ServerRepositoryService
from bot_data.service.statistic_repository_service import StatisticRepositoryService
from bot_data.service.user_joined_server_repository_service import UserJoinedServerRepositoryService
from bot_data.service.user_joined_voice_channel_repository_service import UserMessageCountPerHourRepositoryService
from bot_data.service.user_joined_voice_channel_service import UserJoinedVoiceChannelRepositoryService
from bot_data.service.user_joined_voice_channel_repository_service import UserJoinedVoiceChannelRepositoryService
from bot_data.service.user_message_count_per_hour_repository_service import UserMessageCountPerHourRepositoryService
from bot_data.service.user_repository_service import UserRepositoryService

View File

@ -1,101 +1,127 @@
from datetime import datetime
from typing import Optional
from cpl_core.database.context import DatabaseContextABC
from cpl_query.extension import List
from bot_core.logging.database_logger import DatabaseLogger
from bot_data.abc.user_message_count_per_hour_repository_abc import UserMessageCountPerHourRepositoryABC
from bot_data.abc.user_joined_voice_channel_repository_abc import UserJoinedVoiceChannelRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.user_message_count_per_hour import UserMessageCountPerHour
from bot_data.model.user_joined_voice_channel import UserJoinedVoiceChannel
class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepositoryABC):
def __init__(
self,
logger: DatabaseLogger,
db_context: DatabaseContextABC,
users: UserRepositoryABC,
):
UserMessageCountPerHourRepositoryABC.__init__(self)
class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryABC):
def __init__(self, logger: DatabaseLogger, db_context: DatabaseContextABC, users: UserRepositoryABC):
self._logger = logger
self._context = db_context
self._users = users
@staticmethod
def _get_value_from_result(value: any) -> Optional[any]:
if isinstance(value, str) and 'NULL' in value:
return None
UserJoinedVoiceChannelRepositoryABC.__init__(self)
return value
def get_user_joined_voice_channels(self) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_all_string()}')
results = self._context.select(UserJoinedVoiceChannel.get_select_all_string())
for result in results:
self._logger.trace(__name__, f'Get user-joined-voice-channel with id {result[0]}')
joins.append(UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
))
def _from_result(self, result: tuple) -> UserMessageCountPerHour:
return UserMessageCountPerHour(
self._get_value_from_result(result[1]),
self._get_value_from_result(result[2]),
self._get_value_from_result(result[3]),
self._users.get_user_by_id(self._get_value_from_result(result[4])),
self._get_value_from_result(result[5]),
self._get_value_from_result(result[6]),
id=self._get_value_from_result(result[0])
return joins
def get_user_joined_voice_channel_by_id(self, id: int) -> UserJoinedVoiceChannel:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_id_string(id)}')
result = self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
)
def get_user_message_count_per_hours(self) -> List[UserMessageCountPerHour]:
umcphs = List(UserMessageCountPerHour)
self._logger.trace(__name__, f'Send SQL command: {UserMessageCountPerHour.get_select_all_string()}')
results = self._context.select(UserMessageCountPerHour.get_select_all_string())
def get_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}')
results = self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
for result in results:
self._logger.trace(__name__, f'Get user message count per hour with id {result[0]}')
umcphs.append(self._from_result(result))
return umcphs
def find_user_message_count_per_hour_by_user_id(self, user_id: int) -> List[Optional[UserMessageCountPerHour]]:
umcphs = List(UserMessageCountPerHour)
sql = UserMessageCountPerHour.get_select_by_user_id_string(user_id)
self._logger.trace(__name__, f'Send SQL command: {sql}')
results = self._context.select(sql)
if results is None or len(results) == 0:
return umcphs
for result in results:
self._logger.trace(__name__, f'Get user message count per hour with id {result[0]}')
umcphs.append(self._from_result(result))
return umcphs
def get_user_message_count_per_hour_by_user_id_and_date(self, user_id: int, date: datetime) -> \
Optional[UserMessageCountPerHour]:
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
self._logger.trace(__name__, f'Send SQL command: {sql}')
result = self._context.select(sql)[0]
return self._from_result(result)
def find_user_message_count_per_hour_by_user_id_and_date(self, user_id: int, date: datetime) -> \
Optional[UserMessageCountPerHour]:
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
self._logger.trace(__name__, f'Send SQL command: {sql}')
result = self._context.select(sql)
joins.append(UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
))
return joins
def get_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> UserJoinedVoiceChannel:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}')
result = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
)
def find_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> Optional[UserJoinedVoiceChannel]:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}')
result = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
def add_user_message_count_per_hour(self, umcph: UserMessageCountPerHour):
self._logger.trace(__name__, f'Send SQL command: {umcph.insert_string}')
self._context.cursor.execute(umcph.insert_string)
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
)
def update_user_message_count_per_hour(self, umcph: UserMessageCountPerHour):
self._logger.trace(__name__, f'Send SQL command: {umcph.udpate_string}')
self._context.cursor.execute(umcph.udpate_string)
def find_active_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[Optional[UserJoinedVoiceChannel]]:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id)}')
result = List(UserJoinedVoiceChannel)
db_results = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
def delete_user_message_count_per_hour(self, umcph: UserMessageCountPerHour):
self._logger.trace(__name__, f'Send SQL command: {umcph.delete_string}')
self._context.cursor.execute(umcph.delete_string)
for db_result in db_results:
result.append(UserJoinedVoiceChannel(
self._users.get_user_by_id(db_result[1]),
db_result[2],
db_result[3],
db_result[4],
db_result[5],
id=db_result[0]
))
def delete_user_message_count_per_hour_by_user_id(self, user_id: int):
sql = UserMessageCountPerHour.delete_by_user_id_string(user_id)
self._logger.trace(__name__, f'Send SQL command: {sql}')
self._context.cursor.execute(sql)
return result
def add_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f'Send SQL command: {user_joined_voice_channel.insert_string}')
self._context.cursor.execute(user_joined_voice_channel.insert_string)
def update_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f'Send SQL command: {user_joined_voice_channel.udpate_string}')
self._context.cursor.execute(user_joined_voice_channel.udpate_string)
def delete_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f'Send SQL command: {user_joined_voice_channel.delete_string}')
self._context.cursor.execute(user_joined_voice_channel.delete_string)
def delete_user_joined_voice_channel_by_user_id(self, user_id: int):
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.delete_by_user_id_string}')
self._context.cursor.execute(UserJoinedVoiceChannel.delete_by_user_id_string(user_id))

View File

@ -1,127 +0,0 @@
from typing import Optional
from cpl_core.database.context import DatabaseContextABC
from cpl_query.extension import List
from bot_core.logging.database_logger import DatabaseLogger
from bot_data.abc.user_joined_voice_channel_repository_abc import UserJoinedVoiceChannelRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.user_joined_voice_channel import UserJoinedVoiceChannel
class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryABC):
def __init__(self, logger: DatabaseLogger, db_context: DatabaseContextABC, users: UserRepositoryABC):
self._logger = logger
self._context = db_context
self._users = users
UserJoinedVoiceChannelRepositoryABC.__init__(self)
def get_user_joined_voice_channels(self) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_all_string()}')
results = self._context.select(UserJoinedVoiceChannel.get_select_all_string())
for result in results:
self._logger.trace(__name__, f'Get user-joined-voice-channel with id {result[0]}')
joins.append(UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
))
return joins
def get_user_joined_voice_channel_by_id(self, id: int) -> UserJoinedVoiceChannel:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_id_string(id)}')
result = self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
)
def get_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}')
results = self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
for result in results:
joins.append(UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
))
return joins
def get_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> UserJoinedVoiceChannel:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}')
result = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
)
def find_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> Optional[UserJoinedVoiceChannel]:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}')
result = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
if result is None or len(result) == 0:
return None
result = result[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0]
)
def find_active_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[Optional[UserJoinedVoiceChannel]]:
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id)}')
result = List(UserJoinedVoiceChannel)
db_results = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
for db_result in db_results:
result.append(UserJoinedVoiceChannel(
self._users.get_user_by_id(db_result[1]),
db_result[2],
db_result[3],
db_result[4],
db_result[5],
id=db_result[0]
))
return result
def add_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f'Send SQL command: {user_joined_voice_channel.insert_string}')
self._context.cursor.execute(user_joined_voice_channel.insert_string)
def update_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f'Send SQL command: {user_joined_voice_channel.udpate_string}')
self._context.cursor.execute(user_joined_voice_channel.udpate_string)
def delete_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f'Send SQL command: {user_joined_voice_channel.delete_string}')
self._context.cursor.execute(user_joined_voice_channel.delete_string)
def delete_user_joined_voice_channel_by_user_id(self, user_id: int):
self._logger.trace(__name__, f'Send SQL command: {UserJoinedVoiceChannel.delete_by_user_id_string}')
self._context.cursor.execute(UserJoinedVoiceChannel.delete_by_user_id_string(user_id))

View File

@ -0,0 +1,101 @@
from datetime import datetime
from typing import Optional
from cpl_core.database.context import DatabaseContextABC
from cpl_query.extension import List
from bot_core.logging.database_logger import DatabaseLogger
from bot_data.abc.user_message_count_per_hour_repository_abc import UserMessageCountPerHourRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.user_message_count_per_hour import UserMessageCountPerHour
class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepositoryABC):
def __init__(
self,
logger: DatabaseLogger,
db_context: DatabaseContextABC,
users: UserRepositoryABC,
):
UserMessageCountPerHourRepositoryABC.__init__(self)
self._logger = logger
self._context = db_context
self._users = users
@staticmethod
def _get_value_from_result(value: any) -> Optional[any]:
if isinstance(value, str) and 'NULL' in value:
return None
return value
def _from_result(self, result: tuple) -> UserMessageCountPerHour:
return UserMessageCountPerHour(
self._get_value_from_result(result[1]),
self._get_value_from_result(result[2]),
self._get_value_from_result(result[3]),
self._users.get_user_by_id(self._get_value_from_result(result[4])),
self._get_value_from_result(result[5]),
self._get_value_from_result(result[6]),
id=self._get_value_from_result(result[0])
)
def get_user_message_count_per_hours(self) -> List[UserMessageCountPerHour]:
umcphs = List(UserMessageCountPerHour)
self._logger.trace(__name__, f'Send SQL command: {UserMessageCountPerHour.get_select_all_string()}')
results = self._context.select(UserMessageCountPerHour.get_select_all_string())
for result in results:
self._logger.trace(__name__, f'Get user message count per hour with id {result[0]}')
umcphs.append(self._from_result(result))
return umcphs
def find_user_message_count_per_hour_by_user_id(self, user_id: int) -> List[Optional[UserMessageCountPerHour]]:
umcphs = List(UserMessageCountPerHour)
sql = UserMessageCountPerHour.get_select_by_user_id_string(user_id)
self._logger.trace(__name__, f'Send SQL command: {sql}')
results = self._context.select(sql)
if results is None or len(results) == 0:
return umcphs
for result in results:
self._logger.trace(__name__, f'Get user message count per hour with id {result[0]}')
umcphs.append(self._from_result(result))
return umcphs
def get_user_message_count_per_hour_by_user_id_and_date(self, user_id: int, date: datetime) -> \
UserMessageCountPerHour:
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
self._logger.trace(__name__, f'Send SQL command: {sql}')
result = self._context.select(sql)[0]
return self._from_result(result)
def find_user_message_count_per_hour_by_user_id_and_date(self, user_id: int, date: datetime) -> \
Optional[UserMessageCountPerHour]:
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
self._logger.trace(__name__, f'Send SQL command: {sql}')
result = self._context.select(sql)
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
def add_user_message_count_per_hour(self, umcph: UserMessageCountPerHour):
self._logger.trace(__name__, f'Send SQL command: {umcph.insert_string}')
self._context.cursor.execute(umcph.insert_string)
def update_user_message_count_per_hour(self, umcph: UserMessageCountPerHour):
self._logger.trace(__name__, f'Send SQL command: {umcph.udpate_string}')
self._context.cursor.execute(umcph.udpate_string)
def delete_user_message_count_per_hour(self, umcph: UserMessageCountPerHour):
self._logger.trace(__name__, f'Send SQL command: {umcph.delete_string}')
self._context.cursor.execute(umcph.delete_string)
def delete_user_message_count_per_hour_by_user_id(self, user_id: int):
sql = UserMessageCountPerHour.delete_by_user_id_string(user_id)
self._logger.trace(__name__, f'Send SQL command: {sql}')
self._context.cursor.execute(sql)