Changed normal data collection to list comprehensions p1 #446

This commit is contained in:
Sven Heidemann 2023-12-02 17:37:50 +01:00
parent a2dd447dbd
commit d927ab8fb7
6 changed files with 122 additions and 223 deletions

View File

@ -39,45 +39,45 @@ class UserJoinedGameServerRepositoryService(UserJoinedGameServerRepositoryABC):
)
def get_user_joined_game_servers(self) -> List[UserJoinedGameServer]:
joins = List(UserJoinedGameServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_all_string()}",
)
results = self._context.select(UserJoinedGameServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-game-server with id {result[0]}")
joins.append(self._from_result(result))
return joins
return List(
UserJoinedGameServer,
[self._from_result(join) for join in self._context.select(UserJoinedGameServer.get_select_all_string())],
)
def get_user_joined_game_server_by_id(self, id: int) -> UserJoinedGameServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedGameServer.get_select_by_id_string(id))[0]
return self._from_result(result)
return self._from_result(self._context.select(UserJoinedGameServer.get_select_by_id_string(id))[0])
def get_user_joined_game_servers_by_user_id(self, user_id: int) -> List[UserJoinedGameServer]:
joins = List(UserJoinedGameServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedGameServer.get_select_by_user_id_string(user_id))
for result in results:
joins.append(self._from_result(result))
return joins
return List(
UserJoinedGameServer,
[
self._from_result(join)
for join in self._context.select(UserJoinedGameServer.get_select_by_user_id_string(user_id))
],
)
def get_active_user_joined_game_server_by_user_id(self, user_id: int) -> UserJoinedGameServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_user_id_string(user_id)}",
)
result = self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))[0]
return self._from_result(result)
return self._from_result(
self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))[0]
)
def find_active_user_joined_game_server_by_user_id(self, user_id: int) -> Optional[UserJoinedGameServer]:
self._logger.trace(
@ -88,22 +88,21 @@ class UserJoinedGameServerRepositoryService(UserJoinedGameServerRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._from_result(result)
return self._from_result(result[0])
def find_active_user_joined_game_servers_by_user_id(self, user_id: int) -> List[Optional[UserJoinedGameServer]]:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_active_by_user_id_string(user_id)}",
)
result = List(UserJoinedGameServer)
db_results = self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))
for db_result in db_results:
result.append(self._from_result(db_result))
return result
return List(
UserJoinedGameServer,
[
self._from_result(join)
for join in self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))
],
)
def add_user_joined_game_server(self, user_joined_game_server: UserJoinedGameServer):
self._logger.trace(__name__, f"Send SQL command: {user_joined_game_server.insert_string}")

View File

@ -23,89 +23,58 @@ class UserJoinedServerRepositoryService(UserJoinedServerRepositoryABC):
UserJoinedServerRepositoryABC.__init__(self)
def get_user_joined_servers(self) -> List[UserJoinedServer]:
joins = List(UserJoinedServer)
self._logger.trace(__name__, f"Send SQL command: {UserJoinedServer.get_select_all_string()}")
results = self._context.select(UserJoinedServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-server with id {result[0]}")
joins.append(
UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
def __from_result(self, result: tuple) -> UserJoinedServer:
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return joins
def get_user_joined_servers(self) -> List[UserJoinedServer]:
self._logger.trace(__name__, f"Send SQL command: {UserJoinedServer.get_select_all_string()}")
return List(
UserJoinedServer,
[self.__from_result(join) for join in self._context.select(UserJoinedServer.get_select_all_string())],
)
def get_user_joined_server_by_id(self, id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedServer.get_select_by_id_string(id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(self._context.select(UserJoinedServer.get_select_by_id_string(id))[0])
def get_user_joined_server_by_server_id(self, server_id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get(id)}",
)
result = self._context.select(UserJoinedServer.get_select_by_id_string(id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
f"Send SQL command: {UserJoinedServer.get_select_by_server_id_string(server_id)}",
)
return self.__from_result(self._context.select(UserJoinedServer.get_select_by_id_string(server_id))[0])
def get_user_joined_servers_by_user_id(self, user_id: int) -> List[UserJoinedServer]:
joins = List(UserJoinedServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedServer.get_select_by_user_id_string(user_id))
for result in results:
joins.append(
UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
return List(
UserJoinedServer,
[
self.__from_result(join)
for join in self._context.select(UserJoinedServer.get_select_by_user_id_string(user_id))
],
)
def get_active_user_joined_server_by_user_id(self, user_id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_user_id_string(user_id)}",
)
result = self._context.select(UserJoinedServer.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
return self.__from_result(
self._context.select(UserJoinedServer.get_select_active_by_user_id_string(user_id))[0]
)
def find_active_user_joined_server_by_user_id(self, user_id: int) -> Optional[UserJoinedServer]:
@ -117,16 +86,7 @@ class UserJoinedServerRepositoryService(UserJoinedServerRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(result[0])
def add_user_joined_server(self, user_joined_server: UserJoinedServer):
self._logger.trace(__name__, f"Send SQL command: {user_joined_server.insert_string}")

View File

@ -25,77 +25,55 @@ class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryAB
UserJoinedVoiceChannelRepositoryABC.__init__(self)
def __from_result(self, result: tuple) -> UserJoinedVoiceChannel:
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def get_user_joined_voice_channels(self) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_all_string()}",
)
results = self._context.select(UserJoinedVoiceChannel.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-voice-channel with id {result[0]}")
joins.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
return List(
UserJoinedVoiceChannel,
[self.__from_result(join) for join in self._context.select(UserJoinedVoiceChannel.get_select_all_string())],
)
def get_user_joined_voice_channel_by_id(self, id: int) -> UserJoinedVoiceChannel:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0])
def get_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
for result in results:
joins.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
return List(
UserJoinedVoiceChannel,
[
self.__from_result(join)
for join in self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
],
)
def get_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> UserJoinedVoiceChannel:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}",
)
result = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
return self.__from_result(
self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
)
def find_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> Optional[UserJoinedVoiceChannel]:
@ -107,38 +85,21 @@ class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryAB
if result is None or len(result) == 0:
return None
result = result[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(result[0])
def find_active_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[Optional[UserJoinedVoiceChannel]]:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id)}",
)
result = List(UserJoinedVoiceChannel)
db_results = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
for db_result in db_results:
result.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(db_result[1]),
db_result[2],
db_result[3],
db_result[4],
db_result[5],
id=db_result[0],
)
)
return result
return List(
UserJoinedVoiceChannel,
[
self.__from_result(join)
for join in self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
],
)
def add_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f"Send SQL command: {user_joined_voice_channel.insert_string}")

View File

@ -44,39 +44,30 @@ class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepository
)
def get_user_message_count_per_hours(self) -> List[UserMessageCountPerHour]:
umcphs = List(UserMessageCountPerHour)
self._logger.trace(
__name__,
f"Send SQL command: {UserMessageCountPerHour.get_select_all_string()}",
)
results = self._context.select(UserMessageCountPerHour.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user message count per hour with id {result[0]}")
umcphs.append(self._from_result(result))
return umcphs
return List(
UserMessageCountPerHour,
[
self._from_result(umcphs)
for umcphs in self._context.select(UserMessageCountPerHour.get_select_all_string())
],
)
def find_user_message_count_per_hour_by_user_id(self, user_id: int) -> List[Optional[UserMessageCountPerHour]]:
umcphs = List(UserMessageCountPerHour)
sql = UserMessageCountPerHour.get_select_by_user_id_string(user_id)
self._logger.trace(__name__, f"Send SQL command: {sql}")
results = self._context.select(sql)
if results is None or len(results) == 0:
return umcphs
for result in results:
self._logger.trace(__name__, f"Get user message count per hour with id {result[0]}")
umcphs.append(self._from_result(result))
return umcphs
return List(UserMessageCountPerHour, [self._from_result(umcphs) for umcphs in self._context.select(sql)])
def get_user_message_count_per_hour_by_user_id_and_date(
self, user_id: int, date: datetime
) -> UserMessageCountPerHour:
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
self._logger.trace(__name__, f"Send SQL command: {sql}")
result = self._context.select(sql)[0]
return self._from_result(result)
return self._from_result(self._context.select(sql)[0])
def find_user_message_count_per_hour_by_user_id_and_date(
self, user_id: int, date: datetime

View File

@ -1,4 +1,3 @@
import datetime
from typing import Optional
from cpl_core.database.context import DatabaseContextABC
@ -38,14 +37,8 @@ class UserRepositoryService(UserRepositoryABC):
)
def get_users(self) -> List[User]:
users = List(User)
self._logger.trace(__name__, f"Send SQL command: {User.get_select_all_string()}")
results = self._context.select(User.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user with id {result[0]}")
users.append(self._from_result(result))
return users
return List(User, [self._from_result(user) for user in self._context.select(User.get_select_all_string())])
def get_user_by_id(self, id: int) -> User:
self._logger.trace(__name__, f"Send SQL command: {User.get_select_by_id_string(id)}")
@ -59,42 +52,40 @@ class UserRepositoryService(UserRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._from_result(result)
return self._from_result(result[0])
def get_users_by_discord_id(self, discord_id: int) -> List[User]:
users = List(User)
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_discord_id_string(discord_id)}",
)
results = self._context.select(User.get_select_by_discord_id_string(discord_id))
for result in results:
users.append(self._from_result(result))
return users
return List(
User,
[
self._from_result(user)
for user in self._context.select(User.get_select_by_discord_id_string(discord_id))
],
)
def get_users_by_server_id(self, server_id: int) -> List[User]:
users = List(User)
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(User.get_select_by_server_id_string(server_id))
for result in results:
users.append(self._from_result(result))
return users
return List(
User,
[self._from_result(user) for user in self._context.select(User.get_select_by_server_id_string(server_id))],
)
def get_user_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> User:
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_discord_id_and_server_id_string(discord_id, server_id)}",
)
result = self._context.select(User.get_select_by_discord_id_and_server_id_string(discord_id, server_id))[0]
return self._from_result(result)
return self._from_result(
self._context.select(User.get_select_by_discord_id_and_server_id_string(discord_id, server_id))[0]
)
def find_user_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> Optional[User]:
self._logger.trace(
@ -105,9 +96,7 @@ class UserRepositoryService(UserRepositoryABC):
if result is None or len(result) == 0:
return None
result = result[0]
return self._from_result(result)
return self._from_result(result[0])
def add_user(self, user: User):
self._logger.trace(__name__, f"Send SQL command: {user.insert_string}")

View File

@ -46,30 +46,29 @@ class UserWarningsRepositoryService(UserWarningsRepositoryABC):
)
def get_user_warnings(self) -> List[UserWarnings]:
warnings = List(UserWarnings)
self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_all_string()}")
results = self._context.select(UserWarnings.get_select_all_string())
for result in results:
warnings.append(self._from_result(result))
return warnings
return List(
UserWarnings,
[self._from_result(warning) for warning in self._context.select(UserWarnings.get_select_all_string())],
)
def get_user_warnings_by_id(self, id: int) -> UserWarnings:
self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_by_id_string(id)}")
result = self._context.select(UserWarnings.get_select_by_id_string(id))[0]
return self._from_result(result)
return self._from_result(self._context.select(UserWarnings.get_select_by_id_string(id))[0])
def get_user_warnings_by_user_id(self, user_id: int) -> List[UserWarnings]:
warnings = List(UserWarnings)
self._logger.trace(
__name__,
f"Send SQL command: {UserWarnings.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserWarnings.get_select_by_user_id_string(user_id))
for result in results:
warnings.append(self._from_result(result))
return warnings
return List(
UserWarnings,
[
self._from_result(warning)
for warning in self._context.select(UserWarnings.get_select_by_user_id_string(user_id))
],
)
def add_user_warnings(self, user_warnings: UserWarnings):
self._logger.trace(__name__, f"Send SQL command: {user_warnings.insert_string}")