Compare commits

...

3 Commits

20 changed files with 360 additions and 643 deletions

View File

@ -48,66 +48,60 @@ class AchievementRepositoryService(AchievementRepositoryABC):
)
def get_achievements(self) -> List[Achievement]:
achievements = List(Achievement)
self._logger.trace(__name__, f"Send SQL command: {Achievement.get_select_all_string()}")
results = self._context.select(Achievement.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user with id {result[0]}")
achievements.append(self._from_result(result))
return achievements
return List(
Achievement,
[self._from_result(result) for result in self._context.select(Achievement.get_select_all_string())],
)
def get_achievement_by_id(self, id: int) -> Achievement:
self._logger.trace(__name__, f"Send SQL command: {Achievement.get_select_by_id_string(id)}")
result = self._context.select(Achievement.get_select_by_id_string(id))[0]
return self._from_result(result)
return self._from_result(self._context.select(Achievement.get_select_by_id_string(id))[0])
def get_achievements_by_server_id(self, server_id: int) -> List[Achievement]:
achievements = List(Achievement)
self._logger.trace(
__name__,
f"Send SQL command: {Achievement.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(Achievement.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Get user with id {result[0]}")
achievements.append(self._from_result(result))
return achievements
return List(
Achievement,
[
self._from_result(result)
for result in self._context.select(Achievement.get_select_by_server_id_string(server_id))
],
)
def get_achievements_by_user_id(self, user_id: int) -> List[Achievement]:
achievements = List(Achievement)
achievements_joins = List(UserGotAchievement)
self._logger.trace(
__name__,
f"Send SQL command: {UserGotAchievement.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserGotAchievement.get_select_by_user_id_string(user_id))
for result in results:
self._logger.trace(__name__, f"Got UserGotAchievement with id {result[0]}")
achievements_joins.append(self._join_from_result(result))
for achievements_join in achievements_joins:
results = self._context.select(Achievement.get_select_by_id_string(achievements_join.achievement.id))
for result in results:
self._logger.trace(__name__, f"Got Achievement with id {result[0]}")
achievements.append(self._from_result(result))
return achievements
return List(
UserGotAchievement,
[
self._join_from_result(result).achievement
for result in self._context.select(UserGotAchievement.get_select_by_user_id_string(user_id))
],
)
def get_user_got_achievements_by_achievement_id(self, achievement_id: int) -> List[Achievement]:
achievements_joins = List(UserGotAchievement)
self._logger.trace(
__name__,
f"Send SQL command: {UserGotAchievement.get_select_by_achievement_id_string(achievement_id)}",
)
results = self._context.select(UserGotAchievement.get_select_by_achievement_id_string(achievement_id))
for result in results:
self._logger.trace(__name__, f"Got UserGotAchievement with id {result[0]}")
achievements_joins.append(self._join_from_result(result))
return achievements_joins
return List(
UserGotAchievement,
[
self._join_from_result(result)
for result in self._context.select(
UserGotAchievement.get_select_by_achievement_id_string(achievement_id)
)
],
)
def add_achievement(self, achievement: Achievement):
self._logger.trace(__name__, f"Send SQL command: {achievement.insert_string}")

View File

@ -44,13 +44,12 @@ class ApiKeyRepositoryService(ApiKeyRepositoryABC):
return api_key
def get_api_keys(self) -> List[ApiKey]:
api_keys = List(ApiKey)
self._logger.trace(__name__, f"Send SQL command: {ApiKey.get_select_all_string()}")
results = self._context.select(ApiKey.get_select_all_string())
for result in results:
api_keys.append(self._api_key_from_result(result))
return api_keys
return List(
ApiKey,
[self._api_key_from_result(result) for result in self._context.select(ApiKey.get_select_all_string())],
)
def get_api_key(self, identifier: str, key: str) -> ApiKey:
self._logger.trace(__name__, f"Send SQL command: {ApiKey.get_select_string(identifier, key)}")

View File

@ -64,23 +64,16 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
__name__,
f"Send SQL command: {auth_user.get_select_user_id_from_relations()}",
)
relation_ids = List(int)
results = self._context.select(auth_user.get_select_user_id_from_relations())
for result in results:
self._logger.trace(__name__, f"Got auth user relation with id {result[0]}")
relation_ids.append(result[0])
return relation_ids
return List(int, [result[0] for result in self._context.select(auth_user.get_select_user_id_from_relations())])
def get_all_auth_users(self) -> List[AuthUser]:
users = List(AuthUser)
self._logger.trace(__name__, f"Send SQL command: {AuthUser.get_select_all_string()}")
results = self._context.select(AuthUser.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get auth user with id {result[0]}")
users.append(self._user_from_result(result))
return users
return List(
AuthUser,
[self._user_from_result(result) for result in self._context.select(AuthUser.get_select_all_string())],
)
def get_filtered_auth_users(self, criteria: AuthUserSelectCriteria) -> FilteredResult:
users = self.get_all_auth_users()
@ -129,8 +122,7 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
def get_auth_user_by_email(self, email: str) -> AuthUser:
self._logger.trace(__name__, f"Send SQL command: {AuthUser.get_select_by_email_string(email)}")
result = self._context.select(AuthUser.get_select_by_email_string(email))[0]
return self._user_from_result(result)
return self._user_from_result(self._context.select(AuthUser.get_select_by_email_string(email))[0])
def find_auth_user_by_email(self, email: str) -> Optional[AuthUser]:
self._logger.trace(__name__, f"Send SQL command: {AuthUser.get_select_by_email_string(email)}")
@ -138,9 +130,7 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._user_from_result(result)
return self._user_from_result(result[0])
def find_auth_user_by_confirmation_id(self, id: str) -> Optional[AuthUser]:
self._logger.trace(
@ -151,9 +141,7 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._user_from_result(result)
return self._user_from_result(result[0])
def find_auth_user_by_forgot_password_id(self, id: str) -> Optional[AuthUser]:
self._logger.trace(
@ -164,9 +152,7 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._user_from_result(result)
return self._user_from_result(result[0])
def add_auth_user(self, user: AuthUser):
self._logger.trace(__name__, f"Send SQL command: {user.insert_string}")

View File

@ -23,27 +23,7 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
AutoRoleRepositoryABC.__init__(self)
def get_auto_roles(self) -> List[AutoRole]:
auto_roles = List(AutoRole)
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_all_string()}")
results = self._context.select(AutoRole.get_select_all_string())
for result in results:
auto_roles.append(
AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_roles
def get_auto_role_by_id(self, id: int) -> AutoRole:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_id_string(id)}")
result = self._context.select(AutoRole.get_select_by_id_string(id))[0]
def _from_result(self, result: tuple) -> AutoRole:
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
@ -53,55 +33,37 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
id=result[0],
)
def get_auto_roles(self) -> List[AutoRole]:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_all_string()}")
return List(
AutoRole, [self._from_result(result) for result in self._context.select(AutoRole.get_select_all_string())]
)
def get_auto_role_by_id(self, id: int) -> AutoRole:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_id_string(id)}")
return self._from_result(self._context.select(AutoRole.get_select_by_id_string(id))[0])
def find_auto_role_by_id(self, id: int) -> Optional[AutoRole]:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_id_string(id)}")
result = self._context.select(AutoRole.get_select_by_id_string(id))
if result is None or len(result) == 0:
return None
result = result[0]
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self._from_result(result[0])
def get_auto_roles_by_server_id(self, id: int) -> List[AutoRole]:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_server_id_string(id)}")
auto_roles = List(AutoRole)
results = self._context.select(AutoRole.get_select_by_server_id_string(id))
for result in results:
auto_roles.append(
AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_roles
return List(
AutoRole,
[self._from_result(result) for result in self._context.select(AutoRole.get_select_by_server_id_string(id))],
)
def get_auto_role_by_message_id(self, id: int) -> AutoRole:
self._logger.trace(
__name__,
f"Send SQL command: {AutoRole.get_select_by_message_id_string(id)}",
)
result = self._context.select(AutoRole.get_select_by_message_id_string(id))[0]
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self._from_result(self._context.select(AutoRole.get_select_by_message_id_string(id))[0])
def find_auto_role_by_message_id(self, id: int) -> Optional[AutoRole]:
self._logger.trace(
@ -112,16 +74,7 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self._from_result(result[0])
def add_auto_role(self, auto_role: AutoRole):
self._logger.trace(__name__, f"Send SQL command: {auto_role.insert_string}")
@ -135,27 +88,7 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
self._logger.trace(__name__, f"Send SQL command: {auto_role.delete_string}")
self._context.cursor.execute(auto_role.delete_string)
def get_auto_role_rules(self) -> List[AutoRoleRule]:
auto_role_rules = List(AutoRoleRule)
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_all_string()}")
results = self._context.select(AutoRoleRule.get_select_all_string())
for result in results:
auto_role_rules.append(
AutoRoleRule(
self.get_auto_role_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_role_rules
def get_auto_role_rule_by_id(self, id: int) -> AutoRoleRule:
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_by_id_string(id)}")
result = self._context.select(AutoRoleRule.get_select_by_id_string(id))[0]
def _rule_from_result(self, result: tuple) -> AutoRoleRule:
return AutoRoleRule(
self.get_auto_role_by_id(result[1]),
result[2],
@ -165,26 +98,29 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
id=result[0],
)
def get_auto_role_rules(self) -> List[AutoRoleRule]:
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_all_string()}")
return List(
AutoRoleRule,
[self._rule_from_result(result) for result in self._context.select(AutoRoleRule.get_select_all_string())],
)
def get_auto_role_rule_by_id(self, id: int) -> AutoRoleRule:
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_by_id_string(id)}")
return self._rule_from_result(self._context.select(AutoRoleRule.get_select_by_id_string(id))[0])
def get_auto_role_rules_by_auto_role_id(self, id: int) -> List[AutoRoleRule]:
auto_role_rules = List(AutoRoleRule)
self._logger.trace(
__name__,
f"Send SQL command: {AutoRoleRule.get_select_by_auto_role_id_string(id)}",
)
results = self._context.select(AutoRoleRule.get_select_by_auto_role_id_string(id))
for result in results:
auto_role_rules.append(
AutoRoleRule(
self.get_auto_role_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_role_rules
return List(
AutoRoleRule,
[
self._rule_from_result(result)
for result in self._context.select(AutoRoleRule.get_select_by_auto_role_id_string(id))
],
)
def add_auto_role_rule(self, auto_role_rule: AutoRoleRule):
self._logger.trace(__name__, f"Send SQL command: {auto_role_rule.insert_string}")

View File

@ -23,32 +23,7 @@ class ClientRepositoryService(ClientRepositoryABC):
ClientRepositoryABC.__init__(self)
def get_clients(self) -> List[Client]:
clients = List(Client)
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_all_string()}")
results = self._context.select(Client.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get client with id {result[0]}")
clients.append(
Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
)
return clients
def get_client_by_id(self, client_id: int) -> Client:
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_by_id_string(client_id)}")
result = self._context.select(Client.get_select_by_id_string(client_id))
def _from_result(self, result: tuple) -> Client:
return Client(
result[1],
result[2],
@ -62,49 +37,37 @@ class ClientRepositoryService(ClientRepositoryABC):
id=result[0],
)
def get_clients(self) -> List[Client]:
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_all_string()}")
return List(
Client, [self._from_result(result) for result in self._context.select(Client.get_select_all_string())]
)
def get_client_by_id(self, client_id: int) -> Client:
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_by_id_string(client_id)}")
return self._from_result(self._context.select(Client.get_select_by_id_string(client_id))[0])
def get_clients_by_server_id(self, server_id: int) -> List[Client]:
clients = List(Client)
self._logger.trace(
__name__,
f"Send SQL command: {Client.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(Client.get_select_by_server_id_string(server_id))
for result in results:
clients.append(
Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
)
return clients
return List(
Client,
[
self._from_result(result)
for result in self._context.select(Client.get_select_by_server_id_string(server_id))
],
)
def get_client_by_discord_id(self, discord_id: int) -> Client:
self._logger.trace(
__name__,
f"Send SQL command: {Client.get_select_by_discord_id_string(discord_id)}",
)
result = self._context.select(Client.get_select_by_discord_id_string(discord_id))[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
return self._from_result(self._context.select(Client.get_select_by_discord_id_string(discord_id))[0])
def find_client_by_discord_id(self, discord_id: int) -> Optional[Client]:
self._logger.trace(
@ -115,20 +78,7 @@ class ClientRepositoryService(ClientRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
return self._from_result(result[0])
def find_client_by_server_id(self, discord_id: int) -> Optional[Client]:
self._logger.trace(
@ -139,20 +89,7 @@ class ClientRepositoryService(ClientRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
return self._from_result(result[0])
def find_client_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> Optional[Client]:
self._logger.trace(
@ -163,20 +100,7 @@ class ClientRepositoryService(ClientRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
return self._from_result(result[0])
def add_client(self, client: Client):
self._logger.trace(__name__, f"Send SQL command: {client.insert_string}")

View File

@ -35,49 +35,49 @@ class GameServerRepositoryService(GameServerRepositoryABC):
)
def get_game_servers(self) -> List[GameServer]:
game_servers = List(GameServer)
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_all_string()}",
)
results = self._context.select(GameServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-game-server with id {result[0]}")
game_servers.append(self._from_result(result))
return game_servers
return List(
GameServer,
[self._from_result(result) for result in self._context.select(GameServer.get_select_all_string())],
)
def get_game_servers_by_server_id(self, id: int) -> List[GameServer]:
game_servers = List(GameServer)
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_by_server_id_string(id)}",
)
results = self._context.select(GameServer.get_select_by_server_id_string(id))
for result in results:
self._logger.trace(__name__, f"Get user-joined-game-server with id {result[0]}")
game_servers.append(self._from_result(result))
return game_servers
return List(
GameServer,
[
self._from_result(result)
for result in self._context.select(GameServer.get_select_by_server_id_string(id))
],
)
def get_game_server_by_id(self, id: int) -> GameServer:
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_by_id_string(id)}",
)
result = self._context.select(GameServer.get_select_by_id_string(id))[0]
return self._from_result(result)
return self._from_result(self._context.select(GameServer.get_select_by_id_string(id))[0])
def get_game_servers_by_api_key_id(self, id: int) -> List[GameServer]:
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_by_api_key_id_string(id)}",
)
game_servers = List(GameServer)
results = self._context.select(GameServer.get_select_by_api_key_id_string(id))
for result in results:
game_servers.append(self._from_result(result))
return game_servers
return List(
GameServer,
[
self._from_result(result)
for result in self._context.select(GameServer.get_select_by_api_key_id_string(id))
],
)
def add_game_server(self, game_server: GameServer):
self._logger.trace(__name__, f"Send SQL command: {game_server.insert_string}")

View File

@ -4,8 +4,8 @@ from cpl_core.database.context import DatabaseContextABC
from cpl_query.extension import List
from bot_core.logging.database_logger import DatabaseLogger
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.level_repository_abc import LevelRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.model.level import Level
@ -41,20 +41,15 @@ class LevelRepositoryService(LevelRepositoryABC):
)
def get_levels(self) -> List[Level]:
levels = List(Level)
self._logger.trace(__name__, f"Send SQL command: {Level.get_select_all_string()}")
results = self._context.select(Level.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get level with id {result[0]}")
levels.append(self._level_from_result(result))
return levels
return List(
Level, [self._level_from_result(result) for result in self._context.select(Level.get_select_all_string())]
)
def get_level_by_id(self, id: int) -> Level:
self._logger.trace(__name__, f"Send SQL command: {Level.get_select_by_id_string(id)}")
result = self._context.select(Level.get_select_by_id_string(id))[0]
return self._level_from_result(result)
return self._level_from_result(self._context.select(Level.get_select_by_id_string(id))[0])
def find_level_by_id(self, id: int) -> Optional[Level]:
self._logger.trace(__name__, f"Send SQL command: {Level.get_select_by_id_string(id)}")
@ -65,21 +60,19 @@ class LevelRepositoryService(LevelRepositoryABC):
return self._level_from_result(result[0])
def get_levels_by_server_id(self, server_id: int) -> List[Level]:
levels = List(Level)
self._logger.trace(
__name__,
f"Send SQL command: {Level.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(Level.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Get level with id {result[0]}")
levels.append(self._level_from_result(result))
return levels
return List(
Level,
[
self._level_from_result(result)
for result in self._context.select(Level.get_select_by_server_id_string(server_id))
],
)
def find_levels_by_server_id(self, server_id: int) -> Optional[List[Level]]:
levels = List(Level)
self._logger.trace(
__name__,
f"Send SQL command: {Level.get_select_by_server_id_string(server_id)}",
@ -88,11 +81,10 @@ class LevelRepositoryService(LevelRepositoryABC):
if results is None or len(results) == 0:
return None
for result in results:
self._logger.trace(__name__, f"Get level with id {result[0]}")
levels.append(self._level_from_result(result))
return levels
return List(
Level,
[self._level_from_result(result) for result in results],
)
def add_level(self, level: Level):
self._logger.trace(__name__, f"Send SQL command: {level.insert_string}")

View File

@ -5,8 +5,8 @@ from cpl_query.extension import List
from discord import EntityType
from bot_core.logging.database_logger import DatabaseLogger
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.scheduled_event_repository_abc import ScheduledEventRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.model.scheduled_event import ScheduledEvent
from bot_data.model.scheduled_event_interval_enum import ScheduledEventIntervalEnum
@ -49,34 +49,34 @@ class ScheduledEventRepositoryService(ScheduledEventRepositoryABC):
)
def get_scheduled_events(self) -> List[ScheduledEvent]:
scheduled_events = List(ScheduledEvent)
self._logger.trace(__name__, f"Send SQL command: {ScheduledEvent.get_select_all_string()}")
results = self._context.select(ScheduledEvent.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get scheduled_event with id {result[0]}")
scheduled_events.append(self._scheduled_event_from_result(result))
return scheduled_events
return List(
ScheduledEvent,
[
self._scheduled_event_from_result(result)
for result in self._context.select(ScheduledEvent.get_select_all_string())
],
)
def get_scheduled_event_by_id(self, id: int) -> ScheduledEvent:
self._logger.trace(__name__, f"Send SQL command: {ScheduledEvent.get_select_by_id_string(id)}")
result = self._context.select(ScheduledEvent.get_select_by_id_string(id))[0]
return self._scheduled_event_from_result(result)
return self._scheduled_event_from_result(self._context.select(ScheduledEvent.get_select_by_id_string(id))[0])
def get_scheduled_events_by_server_id(self, server_id: int) -> List[ScheduledEvent]:
scheduled_events = List(ScheduledEvent)
self._logger.trace(
__name__,
f"Send SQL command: {ScheduledEvent.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(ScheduledEvent.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Get scheduled_event with id {result[0]}")
scheduled_events.append(self._scheduled_event_from_result(result))
return scheduled_events
return List(
ScheduledEvent,
[
self._scheduled_event_from_result(result)
for result in self._context.select(ScheduledEvent.get_select_by_server_id_string(server_id))
],
)
def add_scheduled_event(self, scheduled_event: ScheduledEvent):
self._logger.trace(__name__, f"Send SQL command: {scheduled_event.insert_string}")

View File

@ -26,15 +26,14 @@ class ServerConfigRepositoryService(ServerConfigRepositoryABC):
self._servers = servers
def _get_team_role_ids(self, server_id: int) -> List[ServerTeamRoleIdsConfig]:
ids = List(ServerTeamRoleIdsConfig)
self._logger.trace(
__name__,
f"Send SQL command: {ServerTeamRoleIdsConfig.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(ServerTeamRoleIdsConfig.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Got ServerTeamRoleIdsConfig with id {result[0]}")
ids.append(
return List(
ServerTeamRoleIdsConfig,
[
ServerTeamRoleIdsConfig(
result[1],
TeamMemberTypeEnum(result[2]),
@ -43,22 +42,23 @@ class ServerConfigRepositoryService(ServerConfigRepositoryABC):
result[5],
id=result[0],
)
)
return ids
for result in self._context.select(ServerTeamRoleIdsConfig.get_select_by_server_id_string(server_id))
],
)
def _get_afk_channel_ids(self, server_id: int) -> List[int]:
urls = List(int)
self._logger.trace(
__name__,
f"Send SQL command: {ServerAFKChannelIdsConfig.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(ServerAFKChannelIdsConfig.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Got ServerAFKChannelIdsConfig with id {result[0]}")
urls.append(result[1])
return urls
return List(
int,
[
result[1]
for result in self._context.select(ServerAFKChannelIdsConfig.get_select_by_server_id_string(server_id))
],
)
def _from_result(self, result: tuple) -> ServerConfig:
return ServerConfig(
@ -102,18 +102,14 @@ class ServerConfigRepositoryService(ServerConfigRepositoryABC):
__name__,
f"Send SQL command: {ServerConfig.get_select_by_server_id_string(server_id)}",
)
result = self._context.select(ServerConfig.get_select_by_server_id_string(server_id))[0]
return self._from_result(result)
return self._from_result(self._context.select(ServerConfig.get_select_by_server_id_string(server_id))[0])
def get_server_config_by_id(self, config_id: int) -> ServerConfig:
self._logger.trace(
__name__,
f"Send SQL command: {ServerConfig.get_select_by_id_string(config_id)}",
)
result = self._context.select(ServerConfig.get_select_by_id_string(config_id))[0]
return self._from_result(result)
return self._from_result(self._context.select(ServerConfig.get_select_by_id_string(config_id))[0])
def add_server_config(self, server_config: ServerConfig):
self._logger.trace(__name__, f"Send SQL command: {server_config.insert_string}")

View File

@ -25,12 +25,15 @@ class ServerRepositoryService(ServerRepositoryABC):
ServerRepositoryABC.__init__(self)
def get_servers(self) -> List[Server]:
servers = List(Server)
self._logger.trace(__name__, f"Send SQL command: {Server.get_select_all_string()}")
results = self._context.select(Server.get_select_all_string())
for result in results:
servers.append(Server(result[1], result[2], result[3], id=result[0]))
servers = List(
Server,
[
Server(result[1], result[2], result[3], id=result[0])
for result in self._context.select(Server.get_select_all_string())
],
)
self._cache.add_servers(servers)
return servers
@ -96,9 +99,7 @@ class ServerRepositoryService(ServerRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return Server(result[1], result[2], result[3], id=result[0])
return Server(result[0][1], result[0][2], result[0][3], id=result[0][0])
def add_server(self, server: Server):
self._logger.trace(__name__, f"Send SQL command: {server.insert_string}")

View File

@ -43,14 +43,15 @@ class ShortRoleNameRepositoryService(ShortRoleNameRepositoryABC):
)
def get_short_role_names(self) -> List[ShortRoleName]:
short_role_names = List(ShortRoleName)
self._logger.trace(__name__, f"Send SQL command: {ShortRoleName.get_select_all_string()}")
results = self._context.select(ShortRoleName.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get short_role_name with id {result[0]}")
short_role_names.append(self._short_role_name_from_result(result))
return short_role_names
return List(
ShortRoleName,
[
self._short_role_name_from_result(result)
for result in self._context.select(ShortRoleName.get_select_all_string())
],
)
def get_short_role_name_by_id(self, id: int) -> ShortRoleName:
self._logger.trace(__name__, f"Send SQL command: {ShortRoleName.get_select_by_id_string(id)}")
@ -59,31 +60,30 @@ class ShortRoleNameRepositoryService(ShortRoleNameRepositoryABC):
return self._short_role_name_from_result(result)
def find_short_role_names_by_role_id(self, role_id: int) -> List[ShortRoleName]:
short_role_names = List(ShortRoleName)
self._logger.trace(
__name__,
f"Send SQL command: {ShortRoleName.get_select_by_role_id_string(role_id)}",
)
results = self._context.select(ShortRoleName.get_select_by_role_id_string(role_id))
for result in results:
self._logger.trace(__name__, f"Get short_role_name with id {result[0]}")
short_role_names.append(self._short_role_name_from_result(result))
return short_role_names
return List(
ShortRoleName,
[
self._short_role_name_from_result(result)
for result in self._context.select(ShortRoleName.get_select_by_role_id_string(role_id))
],
)
def get_short_role_names_by_server_id(self, server_id: int) -> List[ShortRoleName]:
short_role_names = List(ShortRoleName)
self._logger.trace(
__name__,
f"Send SQL command: {ShortRoleName.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(ShortRoleName.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Get short_role_name with id {result[0]}")
short_role_names.append(self._short_role_name_from_result(result))
return short_role_names
return List(
ShortRoleName,
[
self._short_role_name_from_result(result)
for result in self._context.select(ShortRoleName.get_select_by_server_id_string(server_id))
],
)
def add_short_role_name(self, short_role_name: ShortRoleName):
self._logger.trace(__name__, f"Send SQL command: {short_role_name.insert_string}")

View File

@ -42,23 +42,24 @@ class SteamSpecialOfferRepositoryService(SteamSpecialOfferRepositoryABC):
)
def get_steam_special_offers(self) -> List[SteamSpecialOffer]:
steam_special_offers = List(SteamSpecialOffer)
self._logger.trace(__name__, f"Send SQL command: {SteamSpecialOffer.get_select_all_string()}")
results = self._context.select(SteamSpecialOffer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get steam_special_offer with id {result[0]}")
steam_special_offers.append(self._steam_special_offer_from_result(result))
return steam_special_offers
return List(
SteamSpecialOffer,
[
self._steam_special_offer_from_result(result)
for result in self._context.select(SteamSpecialOffer.get_select_all_string())
],
)
def get_steam_special_offer_by_name(self, name: str) -> SteamSpecialOffer:
self._logger.trace(
__name__,
f"Send SQL command: {SteamSpecialOffer.get_select_by_name_string(name)}",
)
result = self._context.select(SteamSpecialOffer.get_select_by_name_string(name))[0]
return self._steam_special_offer_from_result(result)
return self._steam_special_offer_from_result(
self._context.select(SteamSpecialOffer.get_select_by_name_string(name))[0]
)
def add_steam_special_offer(self, steam_special_offer: SteamSpecialOffer):
self._logger.trace(__name__, f"Send SQL command: {steam_special_offer.insert_string}")

View File

@ -18,27 +18,17 @@ class TechnicianConfigRepositoryService(TechnicianConfigRepositoryABC):
self._context = db_context
def _get_technician_ids(self) -> List[int]:
ids = List(int)
self._logger.trace(__name__, f"Send SQL command: {TechnicianIdConfig.get_select_all_string()}")
results = self._context.select(TechnicianIdConfig.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Got TechnicianId with id {result[0]}")
ids.append(result[1])
return ids
return List(int, [config[1] for config in self._context.select(TechnicianIdConfig.get_select_all_string())])
def _get_technician_ping_urls(self) -> List[str]:
urls = List(str)
self._logger.trace(
__name__,
f"Send SQL command: {TechnicianPingUrlConfig.get_select_all_string()}",
)
results = self._context.select(TechnicianPingUrlConfig.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Got TechnicianPingUrl with id {result[0]}")
urls.append(result[1])
return urls
return List(
str, [ping_url[1] for ping_url in self._context.select(TechnicianPingUrlConfig.get_select_all_string())]
)
def _from_result(self, result: tuple) -> TechnicianConfig:
return TechnicianConfig(
@ -64,9 +54,7 @@ class TechnicianConfigRepositoryService(TechnicianConfigRepositoryABC):
def get_technician_config(self) -> TechnicianConfig:
self._logger.trace(__name__, f"Send SQL command: {TechnicianConfig.get_select_all_string()}")
result = self._context.select(TechnicianConfig.get_select_all_string())[0]
return self._from_result(result)
return self._from_result(self._context.select(TechnicianConfig.get_select_all_string())[0])
def add_technician_config(self, technician_config: TechnicianConfig):
self._logger.trace(__name__, f"Send SQL command: {technician_config.insert_string}")

View File

@ -39,46 +39,47 @@ class UserGameIdentRepositoryService(UserGameIdentRepositoryABC):
)
def get_user_game_idents(self) -> List[UserGameIdent]:
joins = List(UserGameIdent)
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_all_string()}",
)
results = self._context.select(UserGameIdent.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get UserGameIdent with id {result[0]}")
joins.append(self._from_result(result))
return joins
return List(
UserGameIdent,
[
self._from_result(game_ident)
for game_ident in self._context.select(UserGameIdent.get_select_all_string())
],
)
def get_user_game_idents_by_game_server_id(self, game_server_id: int) -> List[UserGameIdent]:
joins = List(UserGameIdent)
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_game_server_id_string(game_server_id)}",
)
results = self._context.select(UserGameIdent.get_select_by_game_server_id_string(game_server_id))
for result in results:
self._logger.trace(__name__, f"Get UserGameIdent with id {result[0]}")
joins.append(self._from_result(result))
return joins
return List(
UserGameIdent,
[
self._from_result(game_ident)
for game_ident in self._context.select(
UserGameIdent.get_select_by_game_server_id_string(game_server_id)
)
],
)
def get_user_game_ident_by_id(self, id: int) -> UserGameIdent:
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_id_string(id)}",
)
result = self._context.select(UserGameIdent.get_select_by_id_string(id))[0]
return self._from_result(result)
return self._from_result(self._context.select(UserGameIdent.get_select_by_id_string(id))[0])
def get_user_game_ident_by_ident(self, ident: str) -> UserGameIdent:
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_ident_string(ident)}",
)
result = self._context.select(UserGameIdent.get_select_by_ident_string(ident))[0]
return self._from_result(result)
return self._from_result(self._context.select(UserGameIdent.get_select_by_ident_string(ident))[0])
def find_user_game_ident_by_ident(self, ident: str) -> Optional[UserGameIdent]:
self._logger.trace(
@ -89,20 +90,20 @@ class UserGameIdentRepositoryService(UserGameIdentRepositoryABC):
if len(result) == 0:
return None
result = result[0]
return self._from_result(result)
return self._from_result(result[0])
def get_user_game_idents_by_user_id(self, user_id: int) -> List[UserGameIdent]:
joins = List(UserGameIdent)
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserGameIdent.get_select_by_user_id_string(user_id))
for result in results:
joins.append(self._from_result(result))
return joins
return List(
UserGameIdent,
[
self._from_result(game_ident)
for game_ident in self._context.select(UserGameIdent.get_select_by_user_id_string(user_id))
],
)
def add_user_game_ident(self, user_game_ident: UserGameIdent):
self._logger.trace(__name__, f"Send SQL command: {user_game_ident.insert_string}")

View File

@ -39,45 +39,45 @@ class UserJoinedGameServerRepositoryService(UserJoinedGameServerRepositoryABC):
)
def get_user_joined_game_servers(self) -> List[UserJoinedGameServer]:
joins = List(UserJoinedGameServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_all_string()}",
)
results = self._context.select(UserJoinedGameServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-game-server with id {result[0]}")
joins.append(self._from_result(result))
return joins
return List(
UserJoinedGameServer,
[self._from_result(join) for join in self._context.select(UserJoinedGameServer.get_select_all_string())],
)
def get_user_joined_game_server_by_id(self, id: int) -> UserJoinedGameServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedGameServer.get_select_by_id_string(id))[0]
return self._from_result(result)
return self._from_result(self._context.select(UserJoinedGameServer.get_select_by_id_string(id))[0])
def get_user_joined_game_servers_by_user_id(self, user_id: int) -> List[UserJoinedGameServer]:
joins = List(UserJoinedGameServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedGameServer.get_select_by_user_id_string(user_id))
for result in results:
joins.append(self._from_result(result))
return joins
return List(
UserJoinedGameServer,
[
self._from_result(join)
for join in self._context.select(UserJoinedGameServer.get_select_by_user_id_string(user_id))
],
)
def get_active_user_joined_game_server_by_user_id(self, user_id: int) -> UserJoinedGameServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_user_id_string(user_id)}",
)
result = self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))[0]
return self._from_result(result)
return self._from_result(
self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))[0]
)
def find_active_user_joined_game_server_by_user_id(self, user_id: int) -> Optional[UserJoinedGameServer]:
self._logger.trace(
@ -88,22 +88,21 @@ class UserJoinedGameServerRepositoryService(UserJoinedGameServerRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._from_result(result)
return self._from_result(result[0])
def find_active_user_joined_game_servers_by_user_id(self, user_id: int) -> List[Optional[UserJoinedGameServer]]:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_active_by_user_id_string(user_id)}",
)
result = List(UserJoinedGameServer)
db_results = self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))
for db_result in db_results:
result.append(self._from_result(db_result))
return result
return List(
UserJoinedGameServer,
[
self._from_result(join)
for join in self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))
],
)
def add_user_joined_game_server(self, user_joined_game_server: UserJoinedGameServer):
self._logger.trace(__name__, f"Send SQL command: {user_joined_game_server.insert_string}")

View File

@ -23,89 +23,58 @@ class UserJoinedServerRepositoryService(UserJoinedServerRepositoryABC):
UserJoinedServerRepositoryABC.__init__(self)
def get_user_joined_servers(self) -> List[UserJoinedServer]:
joins = List(UserJoinedServer)
self._logger.trace(__name__, f"Send SQL command: {UserJoinedServer.get_select_all_string()}")
results = self._context.select(UserJoinedServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-server with id {result[0]}")
joins.append(
UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
def __from_result(self, result: tuple) -> UserJoinedServer:
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return joins
def get_user_joined_servers(self) -> List[UserJoinedServer]:
self._logger.trace(__name__, f"Send SQL command: {UserJoinedServer.get_select_all_string()}")
return List(
UserJoinedServer,
[self.__from_result(join) for join in self._context.select(UserJoinedServer.get_select_all_string())],
)
def get_user_joined_server_by_id(self, id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedServer.get_select_by_id_string(id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(self._context.select(UserJoinedServer.get_select_by_id_string(id))[0])
def get_user_joined_server_by_server_id(self, server_id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get(id)}",
)
result = self._context.select(UserJoinedServer.get_select_by_id_string(id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
f"Send SQL command: {UserJoinedServer.get_select_by_server_id_string(server_id)}",
)
return self.__from_result(self._context.select(UserJoinedServer.get_select_by_id_string(server_id))[0])
def get_user_joined_servers_by_user_id(self, user_id: int) -> List[UserJoinedServer]:
joins = List(UserJoinedServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedServer.get_select_by_user_id_string(user_id))
for result in results:
joins.append(
UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
return List(
UserJoinedServer,
[
self.__from_result(join)
for join in self._context.select(UserJoinedServer.get_select_by_user_id_string(user_id))
],
)
def get_active_user_joined_server_by_user_id(self, user_id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_user_id_string(user_id)}",
)
result = self._context.select(UserJoinedServer.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
return self.__from_result(
self._context.select(UserJoinedServer.get_select_active_by_user_id_string(user_id))[0]
)
def find_active_user_joined_server_by_user_id(self, user_id: int) -> Optional[UserJoinedServer]:
@ -117,16 +86,7 @@ class UserJoinedServerRepositoryService(UserJoinedServerRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(result[0])
def add_user_joined_server(self, user_joined_server: UserJoinedServer):
self._logger.trace(__name__, f"Send SQL command: {user_joined_server.insert_string}")

View File

@ -25,77 +25,55 @@ class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryAB
UserJoinedVoiceChannelRepositoryABC.__init__(self)
def __from_result(self, result: tuple) -> UserJoinedVoiceChannel:
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def get_user_joined_voice_channels(self) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_all_string()}",
)
results = self._context.select(UserJoinedVoiceChannel.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-voice-channel with id {result[0]}")
joins.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
return List(
UserJoinedVoiceChannel,
[self.__from_result(join) for join in self._context.select(UserJoinedVoiceChannel.get_select_all_string())],
)
def get_user_joined_voice_channel_by_id(self, id: int) -> UserJoinedVoiceChannel:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0])
def get_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
for result in results:
joins.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
return List(
UserJoinedVoiceChannel,
[
self.__from_result(join)
for join in self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
],
)
def get_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> UserJoinedVoiceChannel:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}",
)
result = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
return self.__from_result(
self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
)
def find_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> Optional[UserJoinedVoiceChannel]:
@ -107,38 +85,21 @@ class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryAB
if result is None or len(result) == 0:
return None
result = result[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(result[0])
def find_active_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[Optional[UserJoinedVoiceChannel]]:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id)}",
)
result = List(UserJoinedVoiceChannel)
db_results = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
for db_result in db_results:
result.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(db_result[1]),
db_result[2],
db_result[3],
db_result[4],
db_result[5],
id=db_result[0],
)
)
return result
return List(
UserJoinedVoiceChannel,
[
self.__from_result(join)
for join in self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
],
)
def add_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f"Send SQL command: {user_joined_voice_channel.insert_string}")

View File

@ -44,39 +44,30 @@ class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepository
)
def get_user_message_count_per_hours(self) -> List[UserMessageCountPerHour]:
umcphs = List(UserMessageCountPerHour)
self._logger.trace(
__name__,
f"Send SQL command: {UserMessageCountPerHour.get_select_all_string()}",
)
results = self._context.select(UserMessageCountPerHour.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user message count per hour with id {result[0]}")
umcphs.append(self._from_result(result))
return umcphs
return List(
UserMessageCountPerHour,
[
self._from_result(umcphs)
for umcphs in self._context.select(UserMessageCountPerHour.get_select_all_string())
],
)
def find_user_message_count_per_hour_by_user_id(self, user_id: int) -> List[Optional[UserMessageCountPerHour]]:
umcphs = List(UserMessageCountPerHour)
sql = UserMessageCountPerHour.get_select_by_user_id_string(user_id)
self._logger.trace(__name__, f"Send SQL command: {sql}")
results = self._context.select(sql)
if results is None or len(results) == 0:
return umcphs
for result in results:
self._logger.trace(__name__, f"Get user message count per hour with id {result[0]}")
umcphs.append(self._from_result(result))
return umcphs
return List(UserMessageCountPerHour, [self._from_result(umcphs) for umcphs in self._context.select(sql)])
def get_user_message_count_per_hour_by_user_id_and_date(
self, user_id: int, date: datetime
) -> UserMessageCountPerHour:
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
self._logger.trace(__name__, f"Send SQL command: {sql}")
result = self._context.select(sql)[0]
return self._from_result(result)
return self._from_result(self._context.select(sql)[0])
def find_user_message_count_per_hour_by_user_id_and_date(
self, user_id: int, date: datetime

View File

@ -1,4 +1,3 @@
import datetime
from typing import Optional
from cpl_core.database.context import DatabaseContextABC
@ -38,14 +37,8 @@ class UserRepositoryService(UserRepositoryABC):
)
def get_users(self) -> List[User]:
users = List(User)
self._logger.trace(__name__, f"Send SQL command: {User.get_select_all_string()}")
results = self._context.select(User.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user with id {result[0]}")
users.append(self._from_result(result))
return users
return List(User, [self._from_result(user) for user in self._context.select(User.get_select_all_string())])
def get_user_by_id(self, id: int) -> User:
self._logger.trace(__name__, f"Send SQL command: {User.get_select_by_id_string(id)}")
@ -59,42 +52,40 @@ class UserRepositoryService(UserRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._from_result(result)
return self._from_result(result[0])
def get_users_by_discord_id(self, discord_id: int) -> List[User]:
users = List(User)
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_discord_id_string(discord_id)}",
)
results = self._context.select(User.get_select_by_discord_id_string(discord_id))
for result in results:
users.append(self._from_result(result))
return users
return List(
User,
[
self._from_result(user)
for user in self._context.select(User.get_select_by_discord_id_string(discord_id))
],
)
def get_users_by_server_id(self, server_id: int) -> List[User]:
users = List(User)
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(User.get_select_by_server_id_string(server_id))
for result in results:
users.append(self._from_result(result))
return users
return List(
User,
[self._from_result(user) for user in self._context.select(User.get_select_by_server_id_string(server_id))],
)
def get_user_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> User:
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_discord_id_and_server_id_string(discord_id, server_id)}",
)
result = self._context.select(User.get_select_by_discord_id_and_server_id_string(discord_id, server_id))[0]
return self._from_result(result)
return self._from_result(
self._context.select(User.get_select_by_discord_id_and_server_id_string(discord_id, server_id))[0]
)
def find_user_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> Optional[User]:
self._logger.trace(
@ -105,9 +96,7 @@ class UserRepositoryService(UserRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._from_result(result)
return self._from_result(result[0])
def add_user(self, user: User):
self._logger.trace(__name__, f"Send SQL command: {user.insert_string}")

View File

@ -46,30 +46,29 @@ class UserWarningsRepositoryService(UserWarningsRepositoryABC):
)
def get_user_warnings(self) -> List[UserWarnings]:
warnings = List(UserWarnings)
self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_all_string()}")
results = self._context.select(UserWarnings.get_select_all_string())
for result in results:
warnings.append(self._from_result(result))
return warnings
return List(
UserWarnings,
[self._from_result(warning) for warning in self._context.select(UserWarnings.get_select_all_string())],
)
def get_user_warnings_by_id(self, id: int) -> UserWarnings:
self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_by_id_string(id)}")
result = self._context.select(UserWarnings.get_select_by_id_string(id))[0]
return self._from_result(result)
return self._from_result(self._context.select(UserWarnings.get_select_by_id_string(id))[0])
def get_user_warnings_by_user_id(self, user_id: int) -> List[UserWarnings]:
warnings = List(UserWarnings)
self._logger.trace(
__name__,
f"Send SQL command: {UserWarnings.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserWarnings.get_select_by_user_id_string(user_id))
for result in results:
warnings.append(self._from_result(result))
return warnings
return List(
UserWarnings,
[
self._from_result(warning)
for warning in self._context.select(UserWarnings.get_select_by_user_id_string(user_id))
],
)
def add_user_warnings(self, user_warnings: UserWarnings):
self._logger.trace(__name__, f"Send SQL command: {user_warnings.insert_string}")