3 Commits

Author SHA1 Message Date
db61a764eb Merge pull request 'staging' (#447) from staging into master
All checks were successful
Deploy prod on push / on-push-deploy_sh-edraft (push) Successful in 3m45s
Reviewed-on: #447
2023-12-02 19:34:22 +01:00
919eef79f6 Merge branch 'master' into staging
All checks were successful
Deploy staging on push / on-push-deploy_sh-edraft (push) Successful in 3m58s
2023-12-02 19:33:58 +01:00
5de6710261 Merge pull request 'staging' (#435) from staging into master
All checks were successful
Deploy prod on push / on-push-deploy_sh-edraft (push) Successful in 3m34s
Reviewed-on: #435
2023-11-07 20:37:13 +01:00
28 changed files with 1009 additions and 619 deletions

View File

@@ -48,16 +48,6 @@ class ClientUtilsABC(ABC):
def get_auto_complete_list(self, _l: List, current: str, select: Callable = None) -> List:
pass
@abstractmethod
def update_user_message_xp_count_by_hour(
self,
created_at: datetime,
user: User,
settings: ServerConfig,
is_reaction: bool = False,
):
pass
@abstractmethod
def is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
self,

View File

@@ -143,13 +143,14 @@ class ClientUtilsService(ClientUtilsABC):
return _l.take(25)
def update_user_message_xp_count_by_hour(
def is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
self,
created_at: datetime,
user: User,
settings: ServerConfig,
is_reaction: bool = False,
):
) -> bool:
umcph = None
try:
umcph = self._umcphs.find_user_message_count_per_hour_by_user_id_and_date(user.id, created_at)
if umcph is None:
@@ -161,50 +162,44 @@ class ClientUtilsService(ClientUtilsABC):
user,
)
)
self._db.save_changes()
umcph = self._umcphs.get_user_message_count_per_hour_by_user_id_and_date(user.id, created_at)
umcph.xp_count += settings.xp_per_reaction if is_reaction else settings.xp_per_message
self._db.save_changes()
umcph = self._umcphs.get_user_message_count_per_hour_by_user_id_and_date(user.id, created_at)
except Exception as e:
self._logger.error(
__name__,
f"Cannot add user message count per hour with id {umcph.id}",
e,
)
return False
try:
if is_reaction:
umcph.xp_count += settings.xp_per_reaction
else:
umcph.xp_count += settings.xp_per_message
self._umcphs.update_user_message_count_per_hour(umcph)
self._db.save_changes()
except Exception as e:
self._logger.error(
__name__,
f"Cannot update user message count per hour {created_at}",
f"Cannot update user message count per hour with id {umcph.id}",
e,
)
return False
def is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
self,
created_at: datetime,
user: User,
settings: ServerConfig,
is_reaction: bool = False,
) -> bool:
try:
umcph = self._umcphs.find_user_message_count_per_hour_by_user_id_and_date(user.id, created_at)
if umcph is None or umcph.xp_count is None:
return False
return umcph.xp_count > settings.max_message_xp_per_hour
except Exception as e:
self._logger.error(
__name__,
f"Cannot add user message count per hour with",
e,
)
if umcph.xp_count is None:
return False
return umcph.xp_count > settings.max_message_xp_per_hour
def get_ontime_for_user(self, user: User) -> float:
return round(
sum(
[
(join.leaved_on - join.joined_on).total_seconds() / 3600
for join in self._user_joined_voice_channel.get_user_joined_voice_channels_by_user_id(user.id)
if join.leaved_on is not None and join.joined_on is not None
]
),
self._user_joined_voice_channel.get_user_joined_voice_channels_by_user_id(user.id)
.where(lambda x: x.leaved_on is not None and x.joined_on is not None)
.sum(lambda join: (join.leaved_on - join.joined_on).total_seconds() / 3600),
2,
)
@@ -219,7 +214,7 @@ class ClientUtilsService(ClientUtilsABC):
guild: Guild = self._bot.guilds.where(lambda g: g == guild).single()
channel = guild.get_channel(discord_channel_id)
message = await channel.fetch_message(discord_message_id)
emoji = List(discord.Emoji, [x for x in guild.emojis if x.name == rule.emoji_name]).single()
emoji = List(discord.Emoji, guild.emojis).where(lambda x: x.name == rule.emoji_name).single()
if emoji is None:
self._logger.debug(__name__, f"Emoji {rule.emoji_name} not found")

View File

@@ -1,8 +1,9 @@
from datetime import datetime, timedelta
from typing import Union
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.container import Member, Guild
from cpl_discord.service import DiscordBotServiceABC
from bot_core.abc.client_utils_abc import ClientUtilsABC
@@ -65,255 +66,356 @@ class DataIntegrityService:
self._is_for_shutdown = False
async def check_data_integrity(self, is_for_shutdown=False):
self._logger.info(__name__, f"Data integrity service started")
if is_for_shutdown != self._is_for_shutdown:
self._is_for_shutdown = is_for_shutdown
def _check_known_users(self):
self._logger.debug(__name__, f"Start checking KnownUsers table, {len(self._bot.users)}")
for u in self._bot.users:
u: discord.User = u
try:
if u.bot:
self._logger.trace(__name__, f"User {u.id} is ignored, because its a bot")
continue
try:
for g in self._bot.guilds:
self._logger.debug(__name__, f"Start check for server: {g.id}")
s = self._get_or_create_server(g)
self._logger.debug(__name__, f"Start check for clients")
self._check_clients(g.id, s)
user = self._known_users.find_user_by_discord_id(u.id)
if user is not None:
continue
for m in [m for m in g.members if not m.bot]:
await self._check_default_role(m)
self._check_known_user(m.id)
self._logger.warn(__name__, f"Unknown user: {u.id}")
self._logger.debug(__name__, f"Add user: {u.id}")
self._known_users.add_user(KnownUser(u.id))
self._db_context.save_changes()
self._logger.debug(__name__, f"Start check for member: {g.id}@{m.id}")
u = self._get_or_create_user(s, m.id)
user = self._known_users.find_user_by_discord_id(u.id)
if user is None:
self._logger.fatal(__name__, f"Cannot add user: {u.id}")
self._logger.debug(__name__, f"Start check for user joined server: {g.id}@{m.id}")
self._check_user_join(g, m, u)
self._logger.debug(__name__, f"Added user: {u.id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get user", e)
self._logger.debug(__name__, f"Start check for user joined voice channels: {g.id}@{m.id}")
self._check_user_joined_vc(g.id, m, u)
def check_servers(self):
self._logger.debug(__name__, f"Start checking Servers table")
for g in self._bot.guilds:
g: discord.Guild = g
try:
server = self._servers.find_server_by_discord_id(g.id)
if server is not None:
continue
self._logger.debug(__name__, f"Start check for user joined game servers: {g.id}@{m.id}")
self._check_user_joined_gs(g.id, m.id, u)
self._logger.warn(__name__, f"Server not found in database: {g.id}")
self._logger.debug(__name__, f"Add server: {g.id}")
self._servers.add_server(Server(g.id))
self._db_context.save_changes()
self._logger.debug(__name__, f"Start check for user got achievements: {g.id}@{m.id}")
await self._check_for_user_achievements(u)
server = self._servers.find_server_by_discord_id(g.id)
if server is None:
self._logger.fatal(__name__, f"Cannot add server: {g.id}")
for m in [m for m in g.members if m.bot]:
u = self._users.find_user_by_discord_id_and_server_id(m.id, s.id)
if u is None:
continue
self._remove_bot(u)
self._logger.info(__name__, f"Data integrity service finished")
except Exception as e:
self._logger.fatal(__name__, f"Checking data integrity failed", e)
def _get_or_create_server(self, guild: Guild) -> Server:
try:
server = self._servers.find_server_by_discord_id(guild.id)
if server is not None:
return server
self._logger.warn(__name__, f"Server not found in database: {guild.id}")
self._logger.debug(__name__, f"Add server: {guild.id}")
self._servers.add_server(Server(guild.id))
self._db_context.save_changes()
server = self._servers.find_server_by_discord_id(guild.id)
if server is None:
self._logger.fatal(__name__, f"Cannot add server: {guild.id}")
self._logger.trace(__name__, f"Added server: {guild.id}")
return server
except Exception as e:
self._logger.error(__name__, f"Cannot get server", e)
def _check_clients(self, guild_id: int, server: Server):
try:
client = self._clients.find_client_by_server_id(server.id)
if client is not None:
return
self._logger.warn(
__name__,
f"Client for server {guild_id} not found in database: {self._bot.user.id}",
)
self._logger.debug(__name__, f"Add client: {self._bot.user.id}")
self._clients.add_client(Client(self._bot.user.id, 0, 0, 0, 0, 0, server))
self._db_context.save_changes()
client = self._clients.find_client_by_server_id(server.id)
if client is None:
self._logger.fatal(
__name__,
f"Cannot add client {self._bot.user.id} for server {guild_id}",
)
self._logger.trace(__name__, f"Added client: {guild_id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get client", e)
self._logger.debug(__name__, f"Added server: {g.id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get server", e)
results = self._servers.get_servers()
if results is None or len(results) == 0:
self._logger.error(__name__, f"Table Servers is empty!")
def _check_known_user(self, member_id: int):
try:
if self._known_users.find_user_by_discord_id(member_id) is not None:
return
def _check_clients(self):
self._logger.debug(__name__, f"Start checking Clients table")
for g in self._bot.guilds:
g: discord.Guild = g
try:
server: Server = self._servers.find_server_by_discord_id(g.id)
if server is None:
self._logger.fatal(__name__, f"Server not found in database: {g.id}")
self._logger.warn(__name__, f"Unknown user: {member_id}")
self._logger.trace(__name__, f"Add known user: {member_id}")
self._known_users.add_user(KnownUser(member_id))
self._db_context.save_changes()
user = self._known_users.find_user_by_discord_id(member_id)
if user is None:
self._logger.fatal(__name__, f"Cannot add user: {member_id}")
self._logger.trace(__name__, f"Added known user: {member_id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get user", e)
def _get_or_create_user(self, server: Server, member_id: int) -> User:
try:
user = self._users.find_user_by_discord_id_and_server_id(member_id, server.id)
if user is not None:
return user
self._logger.warn(__name__, f"User not found in database: {member_id}")
self._logger.debug(__name__, f"Add user: {member_id}")
self._users.add_user(User(member_id, 0, 0, 0, None, server))
self._db_context.save_changes()
self._logger.trace(__name__, f"Added User: {member_id}")
return self._users.get_user_by_discord_id_and_server_id(member_id, server.id)
except Exception as e:
self._logger.error(__name__, f"Cannot get User", e)
results = self._users.get_users()
if results is None or len(results) == 0:
self._logger.error(__name__, f"Table Users is empty!")
def _check_user_join(self, guild: Guild, member: Member, user: User):
try:
join = self._user_joins.find_active_user_joined_server_by_user_id(user.id)
if join is not None:
return
self._logger.warn(
__name__,
f"Active UserJoinedServer not found in database: {guild.id}:{member.id}@{member.joined_at}",
)
self._logger.debug(
__name__,
f"Add UserJoinedServer: {guild.id}:{member.id}@{member.joined_at}",
)
self._user_joins.add_user_joined_server(UserJoinedServer(user, self._dtp.transform(member.joined_at), None))
self._db_context.save_changes()
self._logger.trace(__name__, f"Added UserJoinedServer: {member.id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get UserJoinedServer", e)
try:
joins = self._user_joins.get_user_joined_servers()
for join in [x for x in joins if x.user.server.discord_id == guild.id and x.leaved_on is None]:
dc_user = guild.get_member(join.user.discord_id)
if dc_user is not None:
client = self._clients.find_client_by_server_id(server.id)
if client is not None:
continue
self._logger.warn(
__name__,
f"User {join.user.discord_id} already left the server.",
f"Client for server {g.id} not found in database: {self._bot.user.id}",
)
join.leaved_on = datetime.now()
self._user_joins.update_user_joined_server(join)
self._logger.debug(__name__, f"Add client: {self._bot.user.id}")
self._clients.add_client(Client(self._bot.user.id, 0, 0, 0, 0, 0, server))
self._db_context.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot update UserJoinedServer", e)
def _check_user_joined_vc(self, guild_id: int, member: Member, user: User):
settings: ServerConfig = self._config.get_configuration(f"ServerConfig_{guild_id}")
client = self._clients.find_client_by_server_id(server.id)
if client is None:
self._logger.fatal(
__name__,
f"Cannot add client {self._bot.user.id} for server {g.id}",
)
try:
# close open voice states
joins = self._user_joins_vc.find_active_user_joined_voice_channels_by_user_id(user.id)
if joins is None or len(joins) == 0:
return
self._logger.debug(__name__, f"Added client: {g.id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get client", e)
results = self._servers.get_servers()
if results is None or len(results) == 0:
self._logger.error(__name__, f"Table Servers is empty!")
def _check_users(self):
self._logger.debug(__name__, f"Start checking Users table")
for g in self._bot.guilds:
g: discord.Guild = g
try:
server = self._servers.find_server_by_discord_id(g.id)
if server is None:
self._logger.fatal(__name__, f"Server not found in database: {g.id}")
for u in g.members:
u: Union[discord.Member, discord.User] = u
if u.bot:
self._logger.trace(__name__, f"User {u.id} is ignored, because its a bot")
continue
user = self._users.find_user_by_discord_id_and_server_id(u.id, server.id)
if user is not None:
continue
self._logger.warn(__name__, f"User not found in database: {u.id}")
self._logger.debug(__name__, f"Add user: {u.id}")
self._users.add_user(User(u.id, 0, 0, 0, None, server))
self._db_context.save_changes()
self._logger.debug(__name__, f"Added User: {u.id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get User", e)
results = self._users.get_users()
if results is None or len(results) == 0:
self._logger.error(__name__, f"Table Users is empty!")
def _check_user_joins(self):
self._logger.debug(__name__, f"Start checking UserJoinedServers table")
for guild in self._bot.guilds:
guild: discord.Guild = guild
server = self._servers.find_server_by_discord_id(guild.id)
if server is None:
self._logger.fatal(__name__, f"Server not found in database: {guild.id}")
try:
for u in guild.members:
u: discord.User = u
if u.bot:
self._logger.trace(__name__, f"User {u.id} is ignored, because its a bot")
continue
user = self._users.find_user_by_discord_id_and_server_id(u.id, server.id)
if user is None:
self._logger.fatal(__name__, f"User not found in database: {u.id}")
join = self._user_joins.find_active_user_joined_server_by_user_id(user.id)
if join is not None:
continue
m: discord.Member = u
self._logger.warn(
__name__,
f"Active UserJoinedServer not found in database: {guild.id}:{u.id}@{m.joined_at}",
)
self._logger.debug(
__name__,
f"Add UserJoinedServer: {guild.id}:{u.id}@{m.joined_at}",
)
self._user_joins.add_user_joined_server(
UserJoinedServer(user, self._dtp.transform(m.joined_at), None)
)
self._db_context.save_changes()
self._logger.debug(__name__, f"Added UserJoinedServer: {u.id}")
except Exception as e:
self._logger.error(__name__, f"Cannot get UserJoinedServer", e)
results = self._users.get_users()
if results is None or len(results) == 0:
self._logger.error(__name__, f"Table Users is empty!")
joins = self._user_joins.get_user_joined_servers()
for join in joins:
self._logger.warn(
__name__,
f"Active UserJoinedVoiceChannel found in database: {guild_id}:{member.id}@{join.joined_on}",
)
join.leaved_on = datetime.now()
join: UserJoinedServer = join
if join.user.server.discord_id != guild.id:
continue
if ((join.leaved_on - join.joined_on).total_seconds() / 60 / 60) > settings.max_voice_state_hours:
join.leaved_on = join.joined_on + timedelta(hours=settings.max_voice_state_hours)
if join.leaved_on is not None:
continue
self._user_joins_vc.update_user_joined_voice_channel(join)
dc_user = guild.get_member(join.user.discord_id)
if dc_user is None:
self._logger.warn(
__name__,
f"User {join.user.discord_id} already left the server.",
)
join.leaved_on = datetime.now()
self._user_joins.update_user_joined_server(join)
if self._is_for_shutdown:
user.xp += round(join.time * settings.xp_per_ontime_hour)
self._users.update_user(user)
self._db_context.save_changes()
if self._is_for_shutdown:
return
# add open voice states
if member.voice is None or member.voice.channel.id in settings.afk_channel_ids:
return
join = UserJoinedVoiceChannel(user, member.voice.channel.id, datetime.now())
self._user_joins_vc.add_user_joined_voice_channel(join)
self._db_context.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot get UserJoinedVoiceChannel", e)
def _check_user_joined_gs(self, guild_id: int, member_id: int, user: User):
try:
joins = self._user_joined_gs.find_active_user_joined_game_servers_by_user_id(user.id)
if joins is None or len(joins) == 0:
return
def _check_user_joins_vc(self):
self._logger.debug(__name__, f"Start checking UserJoinedVoiceChannel table")
for guild in self._bot.guilds:
guild: discord.Guild = guild
settings: ServerConfig = self._config.get_configuration(f"ServerConfig_{guild.id}")
for join in joins:
self._logger.warn(
__name__,
f"Active UserJoinedGameServer found in database: {guild_id}:{member_id}@{join.joined_on}",
)
join.leaved_on = datetime.now()
settings: ServerConfig = self._config.get_configuration(f"ServerConfig_{guild_id}")
server = self._servers.find_server_by_discord_id(guild.id)
if server is None:
self._logger.fatal(__name__, f"Server not found in database: {guild.id}")
if join.time > settings.max_voice_state_hours:
join.leaved_on = join.joined_on + timedelta(hours=settings.max_voice_state_hours)
try:
# close open voice states
for member in guild.members:
if member.bot:
self._logger.trace(__name__, f"User {member.id} is ignored, because its a bot")
continue
self._user_joined_gs.update_user_joined_game_server(join)
if self._is_for_shutdown:
user.xp += round(join.time * settings.xp_per_ontime_hour)
self._users.update_user(user)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if user is None:
self._logger.fatal(__name__, f"User not found in database: {member.id}")
joins = self._user_joins_vc.find_active_user_joined_voice_channels_by_user_id(user.id)
if joins is None or len(joins) == 0:
continue
for join in joins:
self._logger.warn(
__name__,
f"Active UserJoinedVoiceChannel found in database: {guild.id}:{member.id}@{join.joined_on}",
)
join.leaved_on = datetime.now()
if (
(join.leaved_on - join.joined_on).total_seconds() / 60 / 60
) > settings.max_voice_state_hours:
join.leaved_on = join.joined_on + timedelta(hours=settings.max_voice_state_hours)
self._user_joins_vc.update_user_joined_voice_channel(join)
if self._is_for_shutdown:
user.xp += round(join.time * settings.xp_per_ontime_hour)
self._users.update_user(user)
self._db_context.save_changes()
if self._is_for_shutdown:
return
# add open voice states
for member in guild.members:
if member.bot:
self._logger.trace(__name__, f"User {member.id} is ignored, because its a bot")
continue
if member.voice is None or member.voice.channel.id in settings.afk_channel_ids:
continue
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if user is None:
self._logger.fatal(__name__, f"User not found in database: {member.id}")
join = UserJoinedVoiceChannel(user, member.voice.channel.id, datetime.now())
self._user_joins_vc.add_user_joined_voice_channel(join)
self._db_context.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot get UserJoinedVoiceChannel", e)
def _check_user_joined_gs(self):
self._logger.debug(__name__, f"Start checking UserJoinedGameServer table")
for guild in self._bot.guilds:
guild: discord.Guild = guild
server = self._servers.find_server_by_discord_id(guild.id)
if server is None:
self._logger.fatal(__name__, f"Server not found in database: {guild.id}")
try:
for member in guild.members:
if member.bot:
self._logger.trace(__name__, f"User {member.id} is ignored, because its a bot")
continue
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if user is None:
self._logger.fatal(__name__, f"User not found in database: {member.id}")
joins = self._user_joined_gs.find_active_user_joined_game_servers_by_user_id(user.id)
if joins is None or len(joins) == 0:
continue
for join in joins:
self._logger.warn(
__name__,
f"Active UserJoinedGameServer found in database: {guild.id}:{member.id}@{join.joined_on}",
)
join.leaved_on = datetime.now()
settings: ServerConfig = self._config.get_configuration(f"ServerConfig_{guild.id}")
if (
(join.leaved_on - join.joined_on).total_seconds() / 60 / 60
) > settings.max_voice_state_hours:
join.leaved_on = join.joined_on + timedelta(hours=settings.max_voice_state_hours)
self._user_joined_gs.update_user_joined_game_server(join)
if self._is_for_shutdown:
user.xp += round(join.time * settings.xp_per_ontime_hour)
self._users.update_user(user)
self._db_context.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot get UserJoinedGameServer", e)
async def _check_for_user_achievements(self):
self._logger.debug(__name__, f"Start checking UserGotAchievement table")
for guild in self._bot.guilds:
server = self._servers.find_server_by_discord_id(guild.id)
if server is None:
self._logger.fatal(__name__, f"Server not found in database: {guild.id}")
for member in guild.members:
if member.bot:
self._logger.trace(__name__, f"User {member.id} is ignored, because its a bot")
continue
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if user is None:
self._logger.fatal(__name__, f"User not found in database: {member.id}")
await self._achievements.validate_achievements_for_user(user)
async def _check_default_role(self):
for guild in self._bot.guilds:
for member in guild.members:
await self._client_utils.check_default_role(member)
def _check_for_bots(self):
for guild in self._bot.guilds:
server = self._servers.get_server_by_discord_id(guild.id)
for member in guild.members.where(lambda x: x.bot):
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if user is None:
continue
for join in self._user_joins.get_user_joined_servers_by_user_id(user.id):
self._user_joins.delete_user_joined_server(join)
self._user_joins_vc.delete_user_joined_voice_channel_by_user_id(user.id)
self._users.delete_user(user)
self._db_context.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot get UserJoinedGameServer", e)
async def _check_for_user_achievements(self, user: User):
try:
await self._achievements.validate_achievements_for_user(user)
except Exception as e:
self._logger.error(__name__, f"Cannot check UserGotAchievement for {user.id}", e)
async def check_data_integrity(self, is_for_shutdown=False):
if is_for_shutdown != self._is_for_shutdown:
self._is_for_shutdown = is_for_shutdown
async def _check_default_role(self, member: Member):
await self._client_utils.check_default_role(member)
def _remove_bot(self, user: User):
known_user = self._known_users.find_user_by_discord_id(user.discord_id)
if known_user is not None:
self._known_users.delete_user(known_user)
for join in self._user_joins.get_user_joined_servers_by_user_id(user.id):
self._user_joins.delete_user_joined_server(join)
self._user_joins_vc.delete_user_joined_voice_channel_by_user_id(user.id)
self._users.delete_user(user)
self._db_context.save_changes()
await self._check_default_role()
self._check_known_users()
self.check_servers()
self._check_clients()
self._check_users()
self._check_user_joins()
self._check_user_joins_vc()
self._check_user_joined_gs()
await self._check_for_user_achievements()
self._check_for_bots()

View File

@@ -19,6 +19,10 @@ class UserJoinedServerRepositoryABC(ABC):
def get_user_joined_server_by_id(self, id: int) -> UserJoinedServer:
pass
@abstractmethod
def get_user_joined_server_by_server_id(self, server_id: int) -> UserJoinedServer:
pass
@abstractmethod
def get_user_joined_servers_by_user_id(self, user_id: int) -> list[UserJoinedServer]:
pass

View File

@@ -67,6 +67,15 @@ class UserJoinedServer(TableABC):
"""
)
@staticmethod
def get_select_by_server_id_string(id: int) -> str:
return str(
f"""
SELECT * FROM `UserJoinedServers`
WHERE `ServerId` = {id};
"""
)
@staticmethod
def get_select_by_user_id_string(id: int) -> str:
return str(

View File

@@ -48,60 +48,66 @@ class AchievementRepositoryService(AchievementRepositoryABC):
)
def get_achievements(self) -> List[Achievement]:
achievements = List(Achievement)
self._logger.trace(__name__, f"Send SQL command: {Achievement.get_select_all_string()}")
results = self._context.select(Achievement.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user with id {result[0]}")
achievements.append(self._from_result(result))
return List(
Achievement,
[self._from_result(result) for result in self._context.select(Achievement.get_select_all_string())],
)
return achievements
def get_achievement_by_id(self, id: int) -> Achievement:
self._logger.trace(__name__, f"Send SQL command: {Achievement.get_select_by_id_string(id)}")
return self._from_result(self._context.select(Achievement.get_select_by_id_string(id))[0])
result = self._context.select(Achievement.get_select_by_id_string(id))[0]
return self._from_result(result)
def get_achievements_by_server_id(self, server_id: int) -> List[Achievement]:
achievements = List(Achievement)
self._logger.trace(
__name__,
f"Send SQL command: {Achievement.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(Achievement.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Get user with id {result[0]}")
achievements.append(self._from_result(result))
return List(
Achievement,
[
self._from_result(result)
for result in self._context.select(Achievement.get_select_by_server_id_string(server_id))
],
)
return achievements
def get_achievements_by_user_id(self, user_id: int) -> List[Achievement]:
achievements = List(Achievement)
achievements_joins = List(UserGotAchievement)
self._logger.trace(
__name__,
f"Send SQL command: {UserGotAchievement.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserGotAchievement.get_select_by_user_id_string(user_id))
for result in results:
self._logger.trace(__name__, f"Got UserGotAchievement with id {result[0]}")
achievements_joins.append(self._join_from_result(result))
return List(
UserGotAchievement,
[
self._join_from_result(result).achievement
for result in self._context.select(UserGotAchievement.get_select_by_user_id_string(user_id))
],
)
for achievements_join in achievements_joins:
results = self._context.select(Achievement.get_select_by_id_string(achievements_join.achievement.id))
for result in results:
self._logger.trace(__name__, f"Got Achievement with id {result[0]}")
achievements.append(self._from_result(result))
return achievements
def get_user_got_achievements_by_achievement_id(self, achievement_id: int) -> List[Achievement]:
achievements_joins = List(UserGotAchievement)
self._logger.trace(
__name__,
f"Send SQL command: {UserGotAchievement.get_select_by_achievement_id_string(achievement_id)}",
)
results = self._context.select(UserGotAchievement.get_select_by_achievement_id_string(achievement_id))
for result in results:
self._logger.trace(__name__, f"Got UserGotAchievement with id {result[0]}")
achievements_joins.append(self._join_from_result(result))
return List(
UserGotAchievement,
[
self._join_from_result(result)
for result in self._context.select(
UserGotAchievement.get_select_by_achievement_id_string(achievement_id)
)
],
)
return achievements_joins
def add_achievement(self, achievement: Achievement):
self._logger.trace(__name__, f"Send SQL command: {achievement.insert_string}")

View File

@@ -44,12 +44,13 @@ class ApiKeyRepositoryService(ApiKeyRepositoryABC):
return api_key
def get_api_keys(self) -> List[ApiKey]:
api_keys = List(ApiKey)
self._logger.trace(__name__, f"Send SQL command: {ApiKey.get_select_all_string()}")
results = self._context.select(ApiKey.get_select_all_string())
for result in results:
api_keys.append(self._api_key_from_result(result))
return List(
ApiKey,
[self._api_key_from_result(result) for result in self._context.select(ApiKey.get_select_all_string())],
)
return api_keys
def get_api_key(self, identifier: str, key: str) -> ApiKey:
self._logger.trace(__name__, f"Send SQL command: {ApiKey.get_select_string(identifier, key)}")

View File

@@ -64,16 +64,23 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
__name__,
f"Send SQL command: {auth_user.get_select_user_id_from_relations()}",
)
relation_ids = List(int)
results = self._context.select(auth_user.get_select_user_id_from_relations())
for result in results:
self._logger.trace(__name__, f"Got auth user relation with id {result[0]}")
relation_ids.append(result[0])
return List(int, [result[0] for result in self._context.select(auth_user.get_select_user_id_from_relations())])
return relation_ids
def get_all_auth_users(self) -> List[AuthUser]:
users = List(AuthUser)
self._logger.trace(__name__, f"Send SQL command: {AuthUser.get_select_all_string()}")
results = self._context.select(AuthUser.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get auth user with id {result[0]}")
users.append(self._user_from_result(result))
return List(
AuthUser,
[self._user_from_result(result) for result in self._context.select(AuthUser.get_select_all_string())],
)
return users
def get_filtered_auth_users(self, criteria: AuthUserSelectCriteria) -> FilteredResult:
users = self.get_all_auth_users()
@@ -122,7 +129,8 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
def get_auth_user_by_email(self, email: str) -> AuthUser:
self._logger.trace(__name__, f"Send SQL command: {AuthUser.get_select_by_email_string(email)}")
return self._user_from_result(self._context.select(AuthUser.get_select_by_email_string(email))[0])
result = self._context.select(AuthUser.get_select_by_email_string(email))[0]
return self._user_from_result(result)
def find_auth_user_by_email(self, email: str) -> Optional[AuthUser]:
self._logger.trace(__name__, f"Send SQL command: {AuthUser.get_select_by_email_string(email)}")
@@ -130,7 +138,9 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
if result is None or len(result) == 0:
return None
return self._user_from_result(result[0])
result = result[0]
return self._user_from_result(result)
def find_auth_user_by_confirmation_id(self, id: str) -> Optional[AuthUser]:
self._logger.trace(
@@ -141,7 +151,9 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
if result is None or len(result) == 0:
return None
return self._user_from_result(result[0])
result = result[0]
return self._user_from_result(result)
def find_auth_user_by_forgot_password_id(self, id: str) -> Optional[AuthUser]:
self._logger.trace(
@@ -152,7 +164,9 @@ class AuthUserRepositoryService(AuthUserRepositoryABC):
if result is None or len(result) == 0:
return None
return self._user_from_result(result[0])
result = result[0]
return self._user_from_result(result)
def add_auth_user(self, user: AuthUser):
self._logger.trace(__name__, f"Send SQL command: {user.insert_string}")

View File

@@ -23,7 +23,27 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
AutoRoleRepositoryABC.__init__(self)
def _from_result(self, result: tuple) -> AutoRole:
def get_auto_roles(self) -> List[AutoRole]:
auto_roles = List(AutoRole)
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_all_string()}")
results = self._context.select(AutoRole.get_select_all_string())
for result in results:
auto_roles.append(
AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_roles
def get_auto_role_by_id(self, id: int) -> AutoRole:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_id_string(id)}")
result = self._context.select(AutoRole.get_select_by_id_string(id))[0]
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
@@ -33,37 +53,55 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
id=result[0],
)
def get_auto_roles(self) -> List[AutoRole]:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_all_string()}")
return List(
AutoRole, [self._from_result(result) for result in self._context.select(AutoRole.get_select_all_string())]
)
def get_auto_role_by_id(self, id: int) -> AutoRole:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_id_string(id)}")
return self._from_result(self._context.select(AutoRole.get_select_by_id_string(id))[0])
def find_auto_role_by_id(self, id: int) -> Optional[AutoRole]:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_id_string(id)}")
result = self._context.select(AutoRole.get_select_by_id_string(id))
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def get_auto_roles_by_server_id(self, id: int) -> List[AutoRole]:
self._logger.trace(__name__, f"Send SQL command: {AutoRole.get_select_by_server_id_string(id)}")
return List(
AutoRole,
[self._from_result(result) for result in self._context.select(AutoRole.get_select_by_server_id_string(id))],
)
auto_roles = List(AutoRole)
results = self._context.select(AutoRole.get_select_by_server_id_string(id))
for result in results:
auto_roles.append(
AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_roles
def get_auto_role_by_message_id(self, id: int) -> AutoRole:
self._logger.trace(
__name__,
f"Send SQL command: {AutoRole.get_select_by_message_id_string(id)}",
)
return self._from_result(self._context.select(AutoRole.get_select_by_message_id_string(id))[0])
result = self._context.select(AutoRole.get_select_by_message_id_string(id))[0]
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def find_auto_role_by_message_id(self, id: int) -> Optional[AutoRole]:
self._logger.trace(
@@ -74,7 +112,16 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return AutoRole(
self._servers.get_server_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def add_auto_role(self, auto_role: AutoRole):
self._logger.trace(__name__, f"Send SQL command: {auto_role.insert_string}")
@@ -88,7 +135,27 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
self._logger.trace(__name__, f"Send SQL command: {auto_role.delete_string}")
self._context.cursor.execute(auto_role.delete_string)
def _rule_from_result(self, result: tuple) -> AutoRoleRule:
def get_auto_role_rules(self) -> List[AutoRoleRule]:
auto_role_rules = List(AutoRoleRule)
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_all_string()}")
results = self._context.select(AutoRoleRule.get_select_all_string())
for result in results:
auto_role_rules.append(
AutoRoleRule(
self.get_auto_role_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_role_rules
def get_auto_role_rule_by_id(self, id: int) -> AutoRoleRule:
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_by_id_string(id)}")
result = self._context.select(AutoRoleRule.get_select_by_id_string(id))[0]
return AutoRoleRule(
self.get_auto_role_by_id(result[1]),
result[2],
@@ -98,29 +165,26 @@ class AutoRoleRepositoryService(AutoRoleRepositoryABC):
id=result[0],
)
def get_auto_role_rules(self) -> List[AutoRoleRule]:
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_all_string()}")
return List(
AutoRoleRule,
[self._rule_from_result(result) for result in self._context.select(AutoRoleRule.get_select_all_string())],
)
def get_auto_role_rule_by_id(self, id: int) -> AutoRoleRule:
self._logger.trace(__name__, f"Send SQL command: {AutoRoleRule.get_select_by_id_string(id)}")
return self._rule_from_result(self._context.select(AutoRoleRule.get_select_by_id_string(id))[0])
def get_auto_role_rules_by_auto_role_id(self, id: int) -> List[AutoRoleRule]:
auto_role_rules = List(AutoRoleRule)
self._logger.trace(
__name__,
f"Send SQL command: {AutoRoleRule.get_select_by_auto_role_id_string(id)}",
)
return List(
AutoRoleRule,
[
self._rule_from_result(result)
for result in self._context.select(AutoRoleRule.get_select_by_auto_role_id_string(id))
],
)
results = self._context.select(AutoRoleRule.get_select_by_auto_role_id_string(id))
for result in results:
auto_role_rules.append(
AutoRoleRule(
self.get_auto_role_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return auto_role_rules
def add_auto_role_rule(self, auto_role_rule: AutoRoleRule):
self._logger.trace(__name__, f"Send SQL command: {auto_role_rule.insert_string}")

View File

@@ -23,7 +23,32 @@ class ClientRepositoryService(ClientRepositoryABC):
ClientRepositoryABC.__init__(self)
def _from_result(self, result: tuple) -> Client:
def get_clients(self) -> List[Client]:
clients = List(Client)
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_all_string()}")
results = self._context.select(Client.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get client with id {result[0]}")
clients.append(
Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
)
return clients
def get_client_by_id(self, client_id: int) -> Client:
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_by_id_string(client_id)}")
result = self._context.select(Client.get_select_by_id_string(client_id))
return Client(
result[1],
result[2],
@@ -37,37 +62,49 @@ class ClientRepositoryService(ClientRepositoryABC):
id=result[0],
)
def get_clients(self) -> List[Client]:
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_all_string()}")
return List(
Client, [self._from_result(result) for result in self._context.select(Client.get_select_all_string())]
)
def get_client_by_id(self, client_id: int) -> Client:
self._logger.trace(__name__, f"Send SQL command: {Client.get_select_by_id_string(client_id)}")
return self._from_result(self._context.select(Client.get_select_by_id_string(client_id))[0])
def get_clients_by_server_id(self, server_id: int) -> List[Client]:
clients = List(Client)
self._logger.trace(
__name__,
f"Send SQL command: {Client.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(Client.get_select_by_server_id_string(server_id))
for result in results:
clients.append(
Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
)
return List(
Client,
[
self._from_result(result)
for result in self._context.select(Client.get_select_by_server_id_string(server_id))
],
)
return clients
def get_client_by_discord_id(self, discord_id: int) -> Client:
self._logger.trace(
__name__,
f"Send SQL command: {Client.get_select_by_discord_id_string(discord_id)}",
)
return self._from_result(self._context.select(Client.get_select_by_discord_id_string(discord_id))[0])
result = self._context.select(Client.get_select_by_discord_id_string(discord_id))[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
def find_client_by_discord_id(self, discord_id: int) -> Optional[Client]:
self._logger.trace(
@@ -78,7 +115,20 @@ class ClientRepositoryService(ClientRepositoryABC):
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
def find_client_by_server_id(self, discord_id: int) -> Optional[Client]:
self._logger.trace(
@@ -89,7 +139,20 @@ class ClientRepositoryService(ClientRepositoryABC):
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
def find_client_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> Optional[Client]:
self._logger.trace(
@@ -100,7 +163,20 @@ class ClientRepositoryService(ClientRepositoryABC):
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return Client(
result[1],
result[2],
result[3],
result[4],
result[5],
result[6],
self._servers.get_server_by_id(result[7]),
result[8],
result[9],
id=result[0],
)
def add_client(self, client: Client):
self._logger.trace(__name__, f"Send SQL command: {client.insert_string}")

View File

@@ -35,49 +35,49 @@ class GameServerRepositoryService(GameServerRepositoryABC):
)
def get_game_servers(self) -> List[GameServer]:
game_servers = List(GameServer)
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_all_string()}",
)
results = self._context.select(GameServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-game-server with id {result[0]}")
game_servers.append(self._from_result(result))
return List(
GameServer,
[self._from_result(result) for result in self._context.select(GameServer.get_select_all_string())],
)
return game_servers
def get_game_servers_by_server_id(self, id: int) -> List[GameServer]:
game_servers = List(GameServer)
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_by_server_id_string(id)}",
)
results = self._context.select(GameServer.get_select_by_server_id_string(id))
for result in results:
self._logger.trace(__name__, f"Get user-joined-game-server with id {result[0]}")
game_servers.append(self._from_result(result))
return List(
GameServer,
[
self._from_result(result)
for result in self._context.select(GameServer.get_select_by_server_id_string(id))
],
)
return game_servers
def get_game_server_by_id(self, id: int) -> GameServer:
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_by_id_string(id)}",
)
return self._from_result(self._context.select(GameServer.get_select_by_id_string(id))[0])
result = self._context.select(GameServer.get_select_by_id_string(id))[0]
return self._from_result(result)
def get_game_servers_by_api_key_id(self, id: int) -> List[GameServer]:
self._logger.trace(
__name__,
f"Send SQL command: {GameServer.get_select_by_api_key_id_string(id)}",
)
return List(
GameServer,
[
self._from_result(result)
for result in self._context.select(GameServer.get_select_by_api_key_id_string(id))
],
)
game_servers = List(GameServer)
results = self._context.select(GameServer.get_select_by_api_key_id_string(id))
for result in results:
game_servers.append(self._from_result(result))
return game_servers
def add_game_server(self, game_server: GameServer):
self._logger.trace(__name__, f"Send SQL command: {game_server.insert_string}")

View File

@@ -4,8 +4,8 @@ from cpl_core.database.context import DatabaseContextABC
from cpl_query.extension import List
from bot_core.logging.database_logger import DatabaseLogger
from bot_data.abc.level_repository_abc import LevelRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.level_repository_abc import LevelRepositoryABC
from bot_data.model.level import Level
@@ -41,15 +41,20 @@ class LevelRepositoryService(LevelRepositoryABC):
)
def get_levels(self) -> List[Level]:
levels = List(Level)
self._logger.trace(__name__, f"Send SQL command: {Level.get_select_all_string()}")
return List(
Level, [self._level_from_result(result) for result in self._context.select(Level.get_select_all_string())]
)
results = self._context.select(Level.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get level with id {result[0]}")
levels.append(self._level_from_result(result))
return levels
def get_level_by_id(self, id: int) -> Level:
self._logger.trace(__name__, f"Send SQL command: {Level.get_select_by_id_string(id)}")
result = self._context.select(Level.get_select_by_id_string(id))[0]
return self._level_from_result(self._context.select(Level.get_select_by_id_string(id))[0])
return self._level_from_result(result)
def find_level_by_id(self, id: int) -> Optional[Level]:
self._logger.trace(__name__, f"Send SQL command: {Level.get_select_by_id_string(id)}")
@@ -60,19 +65,21 @@ class LevelRepositoryService(LevelRepositoryABC):
return self._level_from_result(result[0])
def get_levels_by_server_id(self, server_id: int) -> List[Level]:
levels = List(Level)
self._logger.trace(
__name__,
f"Send SQL command: {Level.get_select_by_server_id_string(server_id)}",
)
return List(
Level,
[
self._level_from_result(result)
for result in self._context.select(Level.get_select_by_server_id_string(server_id))
],
)
results = self._context.select(Level.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Get level with id {result[0]}")
levels.append(self._level_from_result(result))
return levels
def find_levels_by_server_id(self, server_id: int) -> Optional[List[Level]]:
levels = List(Level)
self._logger.trace(
__name__,
f"Send SQL command: {Level.get_select_by_server_id_string(server_id)}",
@@ -81,10 +88,11 @@ class LevelRepositoryService(LevelRepositoryABC):
if results is None or len(results) == 0:
return None
return List(
Level,
[self._level_from_result(result) for result in results],
)
for result in results:
self._logger.trace(__name__, f"Get level with id {result[0]}")
levels.append(self._level_from_result(result))
return levels
def add_level(self, level: Level):
self._logger.trace(__name__, f"Send SQL command: {level.insert_string}")

View File

@@ -5,8 +5,8 @@ from cpl_query.extension import List
from discord import EntityType
from bot_core.logging.database_logger import DatabaseLogger
from bot_data.abc.scheduled_event_repository_abc import ScheduledEventRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.scheduled_event_repository_abc import ScheduledEventRepositoryABC
from bot_data.model.scheduled_event import ScheduledEvent
from bot_data.model.scheduled_event_interval_enum import ScheduledEventIntervalEnum
@@ -49,34 +49,34 @@ class ScheduledEventRepositoryService(ScheduledEventRepositoryABC):
)
def get_scheduled_events(self) -> List[ScheduledEvent]:
scheduled_events = List(ScheduledEvent)
self._logger.trace(__name__, f"Send SQL command: {ScheduledEvent.get_select_all_string()}")
results = self._context.select(ScheduledEvent.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get scheduled_event with id {result[0]}")
scheduled_events.append(self._scheduled_event_from_result(result))
return List(
ScheduledEvent,
[
self._scheduled_event_from_result(result)
for result in self._context.select(ScheduledEvent.get_select_all_string())
],
)
return scheduled_events
def get_scheduled_event_by_id(self, id: int) -> ScheduledEvent:
self._logger.trace(__name__, f"Send SQL command: {ScheduledEvent.get_select_by_id_string(id)}")
result = self._context.select(ScheduledEvent.get_select_by_id_string(id))[0]
return self._scheduled_event_from_result(self._context.select(ScheduledEvent.get_select_by_id_string(id))[0])
return self._scheduled_event_from_result(result)
def get_scheduled_events_by_server_id(self, server_id: int) -> List[ScheduledEvent]:
scheduled_events = List(ScheduledEvent)
self._logger.trace(
__name__,
f"Send SQL command: {ScheduledEvent.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(ScheduledEvent.get_select_by_server_id_string(server_id))
return List(
ScheduledEvent,
[
self._scheduled_event_from_result(result)
for result in self._context.select(ScheduledEvent.get_select_by_server_id_string(server_id))
],
)
for result in results:
self._logger.trace(__name__, f"Get scheduled_event with id {result[0]}")
scheduled_events.append(self._scheduled_event_from_result(result))
return scheduled_events
def add_scheduled_event(self, scheduled_event: ScheduledEvent):
self._logger.trace(__name__, f"Send SQL command: {scheduled_event.insert_string}")

View File

@@ -26,14 +26,15 @@ class ServerConfigRepositoryService(ServerConfigRepositoryABC):
self._servers = servers
def _get_team_role_ids(self, server_id: int) -> List[ServerTeamRoleIdsConfig]:
ids = List(ServerTeamRoleIdsConfig)
self._logger.trace(
__name__,
f"Send SQL command: {ServerTeamRoleIdsConfig.get_select_by_server_id_string(server_id)}",
)
return List(
ServerTeamRoleIdsConfig,
[
results = self._context.select(ServerTeamRoleIdsConfig.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Got ServerTeamRoleIdsConfig with id {result[0]}")
ids.append(
ServerTeamRoleIdsConfig(
result[1],
TeamMemberTypeEnum(result[2]),
@@ -42,23 +43,22 @@ class ServerConfigRepositoryService(ServerConfigRepositoryABC):
result[5],
id=result[0],
)
for result in self._context.select(ServerTeamRoleIdsConfig.get_select_by_server_id_string(server_id))
],
)
)
return ids
def _get_afk_channel_ids(self, server_id: int) -> List[int]:
urls = List(int)
self._logger.trace(
__name__,
f"Send SQL command: {ServerAFKChannelIdsConfig.get_select_by_server_id_string(server_id)}",
)
results = self._context.select(ServerAFKChannelIdsConfig.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Got ServerAFKChannelIdsConfig with id {result[0]}")
urls.append(result[1])
return List(
int,
[
result[1]
for result in self._context.select(ServerAFKChannelIdsConfig.get_select_by_server_id_string(server_id))
],
)
return urls
def _from_result(self, result: tuple) -> ServerConfig:
return ServerConfig(
@@ -102,14 +102,18 @@ class ServerConfigRepositoryService(ServerConfigRepositoryABC):
__name__,
f"Send SQL command: {ServerConfig.get_select_by_server_id_string(server_id)}",
)
return self._from_result(self._context.select(ServerConfig.get_select_by_server_id_string(server_id))[0])
result = self._context.select(ServerConfig.get_select_by_server_id_string(server_id))[0]
return self._from_result(result)
def get_server_config_by_id(self, config_id: int) -> ServerConfig:
self._logger.trace(
__name__,
f"Send SQL command: {ServerConfig.get_select_by_id_string(config_id)}",
)
return self._from_result(self._context.select(ServerConfig.get_select_by_id_string(config_id))[0])
result = self._context.select(ServerConfig.get_select_by_id_string(config_id))[0]
return self._from_result(result)
def add_server_config(self, server_config: ServerConfig):
self._logger.trace(__name__, f"Send SQL command: {server_config.insert_string}")

View File

@@ -25,15 +25,12 @@ class ServerRepositoryService(ServerRepositoryABC):
ServerRepositoryABC.__init__(self)
def get_servers(self) -> List[Server]:
servers = List(Server)
self._logger.trace(__name__, f"Send SQL command: {Server.get_select_all_string()}")
results = self._context.select(Server.get_select_all_string())
for result in results:
servers.append(Server(result[1], result[2], result[3], id=result[0]))
servers = List(
Server,
[
Server(result[1], result[2], result[3], id=result[0])
for result in self._context.select(Server.get_select_all_string())
],
)
self._cache.add_servers(servers)
return servers
@@ -99,7 +96,9 @@ class ServerRepositoryService(ServerRepositoryABC):
if result is None or len(result) == 0:
return None
return Server(result[0][1], result[0][2], result[0][3], id=result[0][0])
result = result[0]
return Server(result[1], result[2], result[3], id=result[0])
def add_server(self, server: Server):
self._logger.trace(__name__, f"Send SQL command: {server.insert_string}")

View File

@@ -43,15 +43,14 @@ class ShortRoleNameRepositoryService(ShortRoleNameRepositoryABC):
)
def get_short_role_names(self) -> List[ShortRoleName]:
short_role_names = List(ShortRoleName)
self._logger.trace(__name__, f"Send SQL command: {ShortRoleName.get_select_all_string()}")
results = self._context.select(ShortRoleName.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get short_role_name with id {result[0]}")
short_role_names.append(self._short_role_name_from_result(result))
return List(
ShortRoleName,
[
self._short_role_name_from_result(result)
for result in self._context.select(ShortRoleName.get_select_all_string())
],
)
return short_role_names
def get_short_role_name_by_id(self, id: int) -> ShortRoleName:
self._logger.trace(__name__, f"Send SQL command: {ShortRoleName.get_select_by_id_string(id)}")
@@ -60,30 +59,31 @@ class ShortRoleNameRepositoryService(ShortRoleNameRepositoryABC):
return self._short_role_name_from_result(result)
def find_short_role_names_by_role_id(self, role_id: int) -> List[ShortRoleName]:
short_role_names = List(ShortRoleName)
self._logger.trace(
__name__,
f"Send SQL command: {ShortRoleName.get_select_by_role_id_string(role_id)}",
)
return List(
ShortRoleName,
[
self._short_role_name_from_result(result)
for result in self._context.select(ShortRoleName.get_select_by_role_id_string(role_id))
],
)
results = self._context.select(ShortRoleName.get_select_by_role_id_string(role_id))
for result in results:
self._logger.trace(__name__, f"Get short_role_name with id {result[0]}")
short_role_names.append(self._short_role_name_from_result(result))
return short_role_names
def get_short_role_names_by_server_id(self, server_id: int) -> List[ShortRoleName]:
short_role_names = List(ShortRoleName)
self._logger.trace(
__name__,
f"Send SQL command: {ShortRoleName.get_select_by_server_id_string(server_id)}",
)
return List(
ShortRoleName,
[
self._short_role_name_from_result(result)
for result in self._context.select(ShortRoleName.get_select_by_server_id_string(server_id))
],
)
results = self._context.select(ShortRoleName.get_select_by_server_id_string(server_id))
for result in results:
self._logger.trace(__name__, f"Get short_role_name with id {result[0]}")
short_role_names.append(self._short_role_name_from_result(result))
return short_role_names
def add_short_role_name(self, short_role_name: ShortRoleName):
self._logger.trace(__name__, f"Send SQL command: {short_role_name.insert_string}")

View File

@@ -42,24 +42,23 @@ class SteamSpecialOfferRepositoryService(SteamSpecialOfferRepositoryABC):
)
def get_steam_special_offers(self) -> List[SteamSpecialOffer]:
steam_special_offers = List(SteamSpecialOffer)
self._logger.trace(__name__, f"Send SQL command: {SteamSpecialOffer.get_select_all_string()}")
results = self._context.select(SteamSpecialOffer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get steam_special_offer with id {result[0]}")
steam_special_offers.append(self._steam_special_offer_from_result(result))
return List(
SteamSpecialOffer,
[
self._steam_special_offer_from_result(result)
for result in self._context.select(SteamSpecialOffer.get_select_all_string())
],
)
return steam_special_offers
def get_steam_special_offer_by_name(self, name: str) -> SteamSpecialOffer:
self._logger.trace(
__name__,
f"Send SQL command: {SteamSpecialOffer.get_select_by_name_string(name)}",
)
return self._steam_special_offer_from_result(
self._context.select(SteamSpecialOffer.get_select_by_name_string(name))[0]
)
result = self._context.select(SteamSpecialOffer.get_select_by_name_string(name))[0]
return self._steam_special_offer_from_result(result)
def add_steam_special_offer(self, steam_special_offer: SteamSpecialOffer):
self._logger.trace(__name__, f"Send SQL command: {steam_special_offer.insert_string}")

View File

@@ -18,17 +18,27 @@ class TechnicianConfigRepositoryService(TechnicianConfigRepositoryABC):
self._context = db_context
def _get_technician_ids(self) -> List[int]:
ids = List(int)
self._logger.trace(__name__, f"Send SQL command: {TechnicianIdConfig.get_select_all_string()}")
return List(int, [config[1] for config in self._context.select(TechnicianIdConfig.get_select_all_string())])
results = self._context.select(TechnicianIdConfig.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Got TechnicianId with id {result[0]}")
ids.append(result[1])
return ids
def _get_technician_ping_urls(self) -> List[str]:
urls = List(str)
self._logger.trace(
__name__,
f"Send SQL command: {TechnicianPingUrlConfig.get_select_all_string()}",
)
return List(
str, [ping_url[1] for ping_url in self._context.select(TechnicianPingUrlConfig.get_select_all_string())]
)
results = self._context.select(TechnicianPingUrlConfig.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Got TechnicianPingUrl with id {result[0]}")
urls.append(result[1])
return urls
def _from_result(self, result: tuple) -> TechnicianConfig:
return TechnicianConfig(
@@ -54,7 +64,9 @@ class TechnicianConfigRepositoryService(TechnicianConfigRepositoryABC):
def get_technician_config(self) -> TechnicianConfig:
self._logger.trace(__name__, f"Send SQL command: {TechnicianConfig.get_select_all_string()}")
return self._from_result(self._context.select(TechnicianConfig.get_select_all_string())[0])
result = self._context.select(TechnicianConfig.get_select_all_string())[0]
return self._from_result(result)
def add_technician_config(self, technician_config: TechnicianConfig):
self._logger.trace(__name__, f"Send SQL command: {technician_config.insert_string}")

View File

@@ -39,47 +39,46 @@ class UserGameIdentRepositoryService(UserGameIdentRepositoryABC):
)
def get_user_game_idents(self) -> List[UserGameIdent]:
joins = List(UserGameIdent)
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_all_string()}",
)
results = self._context.select(UserGameIdent.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get UserGameIdent with id {result[0]}")
joins.append(self._from_result(result))
return List(
UserGameIdent,
[
self._from_result(game_ident)
for game_ident in self._context.select(UserGameIdent.get_select_all_string())
],
)
return joins
def get_user_game_idents_by_game_server_id(self, game_server_id: int) -> List[UserGameIdent]:
joins = List(UserGameIdent)
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_game_server_id_string(game_server_id)}",
)
return List(
UserGameIdent,
[
self._from_result(game_ident)
for game_ident in self._context.select(
UserGameIdent.get_select_by_game_server_id_string(game_server_id)
)
],
)
results = self._context.select(UserGameIdent.get_select_by_game_server_id_string(game_server_id))
for result in results:
self._logger.trace(__name__, f"Get UserGameIdent with id {result[0]}")
joins.append(self._from_result(result))
return joins
def get_user_game_ident_by_id(self, id: int) -> UserGameIdent:
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_id_string(id)}",
)
return self._from_result(self._context.select(UserGameIdent.get_select_by_id_string(id))[0])
result = self._context.select(UserGameIdent.get_select_by_id_string(id))[0]
return self._from_result(result)
def get_user_game_ident_by_ident(self, ident: str) -> UserGameIdent:
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_ident_string(ident)}",
)
return self._from_result(self._context.select(UserGameIdent.get_select_by_ident_string(ident))[0])
result = self._context.select(UserGameIdent.get_select_by_ident_string(ident))[0]
return self._from_result(result)
def find_user_game_ident_by_ident(self, ident: str) -> Optional[UserGameIdent]:
self._logger.trace(
@@ -90,20 +89,20 @@ class UserGameIdentRepositoryService(UserGameIdentRepositoryABC):
if len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return self._from_result(result)
def get_user_game_idents_by_user_id(self, user_id: int) -> List[UserGameIdent]:
joins = List(UserGameIdent)
self._logger.trace(
__name__,
f"Send SQL command: {UserGameIdent.get_select_by_user_id_string(user_id)}",
)
return List(
UserGameIdent,
[
self._from_result(game_ident)
for game_ident in self._context.select(UserGameIdent.get_select_by_user_id_string(user_id))
],
)
results = self._context.select(UserGameIdent.get_select_by_user_id_string(user_id))
for result in results:
joins.append(self._from_result(result))
return joins
def add_user_game_ident(self, user_game_ident: UserGameIdent):
self._logger.trace(__name__, f"Send SQL command: {user_game_ident.insert_string}")

View File

@@ -39,45 +39,45 @@ class UserJoinedGameServerRepositoryService(UserJoinedGameServerRepositoryABC):
)
def get_user_joined_game_servers(self) -> List[UserJoinedGameServer]:
joins = List(UserJoinedGameServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_all_string()}",
)
results = self._context.select(UserJoinedGameServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-game-server with id {result[0]}")
joins.append(self._from_result(result))
return List(
UserJoinedGameServer,
[self._from_result(join) for join in self._context.select(UserJoinedGameServer.get_select_all_string())],
)
return joins
def get_user_joined_game_server_by_id(self, id: int) -> UserJoinedGameServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_id_string(id)}",
)
return self._from_result(self._context.select(UserJoinedGameServer.get_select_by_id_string(id))[0])
result = self._context.select(UserJoinedGameServer.get_select_by_id_string(id))[0]
return self._from_result(result)
def get_user_joined_game_servers_by_user_id(self, user_id: int) -> List[UserJoinedGameServer]:
joins = List(UserJoinedGameServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedGameServer.get_select_by_user_id_string(user_id))
for result in results:
joins.append(self._from_result(result))
return List(
UserJoinedGameServer,
[
self._from_result(join)
for join in self._context.select(UserJoinedGameServer.get_select_by_user_id_string(user_id))
],
)
return joins
def get_active_user_joined_game_server_by_user_id(self, user_id: int) -> UserJoinedGameServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_by_user_id_string(user_id)}",
)
return self._from_result(
self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))[0]
)
result = self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))[0]
return self._from_result(result)
def find_active_user_joined_game_server_by_user_id(self, user_id: int) -> Optional[UserJoinedGameServer]:
self._logger.trace(
@@ -88,21 +88,22 @@ class UserJoinedGameServerRepositoryService(UserJoinedGameServerRepositoryABC):
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return self._from_result(result)
def find_active_user_joined_game_servers_by_user_id(self, user_id: int) -> List[Optional[UserJoinedGameServer]]:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedGameServer.get_select_active_by_user_id_string(user_id)}",
)
result = List(UserJoinedGameServer)
db_results = self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))
return List(
UserJoinedGameServer,
[
self._from_result(join)
for join in self._context.select(UserJoinedGameServer.get_select_active_by_user_id_string(user_id))
],
)
for db_result in db_results:
result.append(self._from_result(db_result))
return result
def add_user_joined_game_server(self, user_joined_game_server: UserJoinedGameServer):
self._logger.trace(__name__, f"Send SQL command: {user_joined_game_server.insert_string}")

View File

@@ -23,7 +23,31 @@ class UserJoinedServerRepositoryService(UserJoinedServerRepositoryABC):
UserJoinedServerRepositoryABC.__init__(self)
def __from_result(self, result: tuple) -> UserJoinedServer:
def get_user_joined_servers(self) -> List[UserJoinedServer]:
joins = List(UserJoinedServer)
self._logger.trace(__name__, f"Send SQL command: {UserJoinedServer.get_select_all_string()}")
results = self._context.select(UserJoinedServer.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-server with id {result[0]}")
joins.append(
UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
def get_user_joined_server_by_id(self, id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedServer.get_select_by_id_string(id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
@@ -33,41 +57,55 @@ class UserJoinedServerRepositoryService(UserJoinedServerRepositoryABC):
id=result[0],
)
def get_user_joined_servers(self) -> List[UserJoinedServer]:
self._logger.trace(__name__, f"Send SQL command: {UserJoinedServer.get_select_all_string()}")
return List(
UserJoinedServer,
[self.__from_result(join) for join in self._context.select(UserJoinedServer.get_select_all_string())],
)
def get_user_joined_server_by_id(self, id: int) -> UserJoinedServer:
def get_user_joined_server_by_server_id(self, server_id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_id_string(id)}",
f"Send SQL command: {UserJoinedServer.get(id)}",
)
result = self._context.select(UserJoinedServer.get_select_by_id_string(id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
return self.__from_result(self._context.select(UserJoinedServer.get_select_by_id_string(id))[0])
def get_user_joined_servers_by_user_id(self, user_id: int) -> List[UserJoinedServer]:
joins = List(UserJoinedServer)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_user_id_string(user_id)}",
)
return List(
UserJoinedServer,
[
self.__from_result(join)
for join in self._context.select(UserJoinedServer.get_select_by_user_id_string(user_id))
],
)
results = self._context.select(UserJoinedServer.get_select_by_user_id_string(user_id))
for result in results:
joins.append(
UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
def get_active_user_joined_server_by_user_id(self, user_id: int) -> UserJoinedServer:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedServer.get_select_by_user_id_string(user_id)}",
)
return self.__from_result(
self._context.select(UserJoinedServer.get_select_active_by_user_id_string(user_id))[0]
result = self._context.select(UserJoinedServer.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def find_active_user_joined_server_by_user_id(self, user_id: int) -> Optional[UserJoinedServer]:
@@ -79,7 +117,16 @@ class UserJoinedServerRepositoryService(UserJoinedServerRepositoryABC):
if result is None or len(result) == 0:
return None
return self.__from_result(result[0])
result = result[0]
return UserJoinedServer(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def add_user_joined_server(self, user_joined_server: UserJoinedServer):
self._logger.trace(__name__, f"Send SQL command: {user_joined_server.insert_string}")

View File

@@ -25,7 +25,34 @@ class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryAB
UserJoinedVoiceChannelRepositoryABC.__init__(self)
def __from_result(self, result: tuple) -> UserJoinedVoiceChannel:
def get_user_joined_voice_channels(self) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_all_string()}",
)
results = self._context.select(UserJoinedVoiceChannel.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user-joined-voice-channel with id {result[0]}")
joins.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return joins
def get_user_joined_voice_channel_by_id(self, id: int) -> UserJoinedVoiceChannel:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_id_string(id)}",
)
result = self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
@@ -35,45 +62,40 @@ class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryAB
id=result[0],
)
def get_user_joined_voice_channels(self) -> List[UserJoinedVoiceChannel]:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_all_string()}",
)
return List(
UserJoinedVoiceChannel,
[self.__from_result(join) for join in self._context.select(UserJoinedVoiceChannel.get_select_all_string())],
)
def get_user_joined_voice_channel_by_id(self, id: int) -> UserJoinedVoiceChannel:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_id_string(id)}",
)
return self.__from_result(self._context.select(UserJoinedVoiceChannel.get_select_by_id_string(id))[0])
def get_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[UserJoinedVoiceChannel]:
joins = List(UserJoinedVoiceChannel)
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
for result in results:
joins.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
)
return List(
UserJoinedVoiceChannel,
[
self.__from_result(join)
for join in self._context.select(UserJoinedVoiceChannel.get_select_by_user_id_string(user_id))
],
)
return joins
def get_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> UserJoinedVoiceChannel:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_by_user_id_string(user_id)}",
)
return self.__from_result(
self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
result = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def find_active_user_joined_voice_channel_by_user_id(self, user_id: int) -> Optional[UserJoinedVoiceChannel]:
@@ -85,21 +107,38 @@ class UserJoinedVoiceChannelRepositoryService(UserJoinedVoiceChannelRepositoryAB
if result is None or len(result) == 0:
return None
return self.__from_result(result[0])
result = result[0]
return UserJoinedVoiceChannel(
self._users.get_user_by_id(result[1]),
result[2],
result[3],
result[4],
result[5],
id=result[0],
)
def find_active_user_joined_voice_channels_by_user_id(self, user_id: int) -> List[Optional[UserJoinedVoiceChannel]]:
self._logger.trace(
__name__,
f"Send SQL command: {UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id)}",
)
result = List(UserJoinedVoiceChannel)
db_results = self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
return List(
UserJoinedVoiceChannel,
[
self.__from_result(join)
for join in self._context.select(UserJoinedVoiceChannel.get_select_active_by_user_id_string(user_id))
],
)
for db_result in db_results:
result.append(
UserJoinedVoiceChannel(
self._users.get_user_by_id(db_result[1]),
db_result[2],
db_result[3],
db_result[4],
db_result[5],
id=db_result[0],
)
)
return result
def add_user_joined_voice_channel(self, user_joined_voice_channel: UserJoinedVoiceChannel):
self._logger.trace(__name__, f"Send SQL command: {user_joined_voice_channel.insert_string}")

View File

@@ -44,30 +44,39 @@ class UserMessageCountPerHourRepositoryService(UserMessageCountPerHourRepository
)
def get_user_message_count_per_hours(self) -> List[UserMessageCountPerHour]:
umcphs = List(UserMessageCountPerHour)
self._logger.trace(
__name__,
f"Send SQL command: {UserMessageCountPerHour.get_select_all_string()}",
)
results = self._context.select(UserMessageCountPerHour.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user message count per hour with id {result[0]}")
umcphs.append(self._from_result(result))
return List(
UserMessageCountPerHour,
[
self._from_result(umcphs)
for umcphs in self._context.select(UserMessageCountPerHour.get_select_all_string())
],
)
return umcphs
def find_user_message_count_per_hour_by_user_id(self, user_id: int) -> List[Optional[UserMessageCountPerHour]]:
umcphs = List(UserMessageCountPerHour)
sql = UserMessageCountPerHour.get_select_by_user_id_string(user_id)
self._logger.trace(__name__, f"Send SQL command: {sql}")
return List(UserMessageCountPerHour, [self._from_result(umcphs) for umcphs in self._context.select(sql)])
results = self._context.select(sql)
if results is None or len(results) == 0:
return umcphs
for result in results:
self._logger.trace(__name__, f"Get user message count per hour with id {result[0]}")
umcphs.append(self._from_result(result))
return umcphs
def get_user_message_count_per_hour_by_user_id_and_date(
self, user_id: int, date: datetime
) -> UserMessageCountPerHour:
sql = UserMessageCountPerHour.get_select_by_user_id_and_date_string(user_id, date)
self._logger.trace(__name__, f"Send SQL command: {sql}")
return self._from_result(self._context.select(sql)[0])
result = self._context.select(sql)[0]
return self._from_result(result)
def find_user_message_count_per_hour_by_user_id_and_date(
self, user_id: int, date: datetime

View File

@@ -1,3 +1,4 @@
import datetime
from typing import Optional
from cpl_core.database.context import DatabaseContextABC
@@ -37,8 +38,14 @@ class UserRepositoryService(UserRepositoryABC):
)
def get_users(self) -> List[User]:
users = List(User)
self._logger.trace(__name__, f"Send SQL command: {User.get_select_all_string()}")
return List(User, [self._from_result(user) for user in self._context.select(User.get_select_all_string())])
results = self._context.select(User.get_select_all_string())
for result in results:
self._logger.trace(__name__, f"Get user with id {result[0]}")
users.append(self._from_result(result))
return users
def get_user_by_id(self, id: int) -> User:
self._logger.trace(__name__, f"Send SQL command: {User.get_select_by_id_string(id)}")
@@ -52,40 +59,42 @@ class UserRepositoryService(UserRepositoryABC):
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return self._from_result(result)
def get_users_by_discord_id(self, discord_id: int) -> List[User]:
users = List(User)
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_discord_id_string(discord_id)}",
)
return List(
User,
[
self._from_result(user)
for user in self._context.select(User.get_select_by_discord_id_string(discord_id))
],
)
results = self._context.select(User.get_select_by_discord_id_string(discord_id))
for result in results:
users.append(self._from_result(result))
return users
def get_users_by_server_id(self, server_id: int) -> List[User]:
users = List(User)
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_server_id_string(server_id)}",
)
return List(
User,
[self._from_result(user) for user in self._context.select(User.get_select_by_server_id_string(server_id))],
)
results = self._context.select(User.get_select_by_server_id_string(server_id))
for result in results:
users.append(self._from_result(result))
return users
def get_user_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> User:
self._logger.trace(
__name__,
f"Send SQL command: {User.get_select_by_discord_id_and_server_id_string(discord_id, server_id)}",
)
result = self._context.select(User.get_select_by_discord_id_and_server_id_string(discord_id, server_id))[0]
return self._from_result(
self._context.select(User.get_select_by_discord_id_and_server_id_string(discord_id, server_id))[0]
)
return self._from_result(result)
def find_user_by_discord_id_and_server_id(self, discord_id: int, server_id: int) -> Optional[User]:
self._logger.trace(
@@ -96,7 +105,9 @@ class UserRepositoryService(UserRepositoryABC):
if result is None or len(result) == 0:
return None
return self._from_result(result[0])
result = result[0]
return self._from_result(result)
def add_user(self, user: User):
self._logger.trace(__name__, f"Send SQL command: {user.insert_string}")

View File

@@ -46,29 +46,30 @@ class UserWarningsRepositoryService(UserWarningsRepositoryABC):
)
def get_user_warnings(self) -> List[UserWarnings]:
warnings = List(UserWarnings)
self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_all_string()}")
return List(
UserWarnings,
[self._from_result(warning) for warning in self._context.select(UserWarnings.get_select_all_string())],
)
results = self._context.select(UserWarnings.get_select_all_string())
for result in results:
warnings.append(self._from_result(result))
return warnings
def get_user_warnings_by_id(self, id: int) -> UserWarnings:
self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_by_id_string(id)}")
return self._from_result(self._context.select(UserWarnings.get_select_by_id_string(id))[0])
result = self._context.select(UserWarnings.get_select_by_id_string(id))[0]
return self._from_result(result)
def get_user_warnings_by_user_id(self, user_id: int) -> List[UserWarnings]:
warnings = List(UserWarnings)
self._logger.trace(
__name__,
f"Send SQL command: {UserWarnings.get_select_by_user_id_string(user_id)}",
)
results = self._context.select(UserWarnings.get_select_by_user_id_string(user_id))
for result in results:
warnings.append(self._from_result(result))
return List(
UserWarnings,
[
self._from_result(warning)
for warning in self._context.select(UserWarnings.get_select_by_user_id_string(user_id))
],
)
return warnings
def add_user_warnings(self, user_warnings: UserWarnings):
self._logger.trace(__name__, f"Send SQL command: {user_warnings.insert_string}")

View File

@@ -67,7 +67,6 @@ class BaseOnMessageEvent(OnMessageABC):
return
settings: ServerConfig = self._config.get_configuration(f"ServerConfig_{server.discord_id}")
self._client_utils.update_user_message_xp_count_by_hour(message.created_at, user, settings)
if self._client_utils.is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
message.created_at, user, settings
):

View File

@@ -73,7 +73,6 @@ class BaseReactionHandler:
settings: ServerConfig = self._config.get_configuration(f"ServerConfig_{guild.id}")
if r_type == "add":
self._client_utils.update_user_message_xp_count_by_hour(datetime.now(), user, settings, is_reaction=True)
if self._client_utils.is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
datetime.now(), user, settings, is_reaction=True
):

View File

@@ -28,5 +28,7 @@ class ConfigOnReadyEvent(OnReadyABC):
self._data_integrity_service = data_integrity_service
async def on_ready(self):
self._data_integrity_service.check_servers()
for guild in self._bot.guilds:
await self._config_service.reload_server_config(self._servers.get_server_by_discord_id(guild.id))