This repository has been archived on 2022-07-14. You can view files and clone it, but cannot push or open issues or pull requests.
sh_gismo/src/modules/base/base.py

279 lines
12 KiB
Python

from datetime import datetime
from typing import Union, Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging.logger_abc import LoggerABC
from gismo_core.abc.bot_service_abc import BotServiceABC
from gismo_core.abc.message_service_abc import MessageServiceABC
from gismo_data.abc.client_repository_abc import ClientRepositoryABC
from gismo_data.abc.known_user_repository_abc import KnownUserRepositoryABC
from gismo_data.abc.server_repository_abc import ServerRepositoryABC
from gismo_data.abc.user_joined_server_repository_abc import \
UserJoinedServerRepositoryABC
from gismo_data.abc.user_joined_voice_channel_abc import UserJoinedVoiceChannelRepositoryABC
from gismo_data.abc.user_repository_abc import UserRepositoryABC
from gismo_data.model.known_user import KnownUser
from gismo_data.model.server import Server
from gismo_data.model.user import User
from gismo_data.model.user_joined_server import UserJoinedServer
from gismo_data.model.user_joined_voice_channel import UserJoinedVoiceChannel
from modules.base.base_settings import BaseSettings
from modules.base.service.afk_command_service import AFKCommandService
from modules.base.service.ping_command_service import PingCommandService
from modules.base.service.purge_command_service import PurgeCommandService
from modules.permission.abc.permission_service_abc import PermissionServiceABC
from gismo_core.abc.events.on_member_join_abc import OnMemberJoinABC
from gismo_core.abc.events.on_member_remove_abc import OnMemberRemoveABC
from gismo_core.abc.events.on_message_abc import OnMessageABC
from gismo_core.abc.events.on_voice_state_update_abc import \
OnVoiceStateUpdateABC
from gismo_core.abc.module_abc import ModuleABC
class Base(ModuleABC, OnMemberJoinABC, OnMemberRemoveABC, OnMessageABC, OnVoiceStateUpdateABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
clients: ClientRepositoryABC,
servers: ServerRepositoryABC,
known_users: KnownUserRepositoryABC,
users: UserRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
user_joins_vc: UserJoinedVoiceChannelRepositoryABC,
bot: BotServiceABC,
db: DatabaseContextABC,
messenger: MessageServiceABC,
permission_service: PermissionServiceABC,
ping_command: PingCommandService,
purge_command: PurgeCommandService,
afk_command: AFKCommandService
):
self._config = config
self._logger = logger
self._clients = clients
self._servers = servers
self._known_users = known_users
self._users = users
self._user_joins = user_joins
self._user_joins_vc = user_joins_vc
self._bot = bot
self._db = db
self._messenger = messenger
self._permission_service = permission_service
ModuleABC.__init__(
self,
{
OnMemberJoinABC: 1,
OnMemberRemoveABC: 1,
OnMessageABC: 30,
OnVoiceStateUpdateABC: 10
},
[BaseSettings]
)
self._bot.add_cog(ping_command)
self._bot.add_cog(purge_command)
self._bot.add_cog(afk_command)
self._logger.info(__name__, f'Module {type(self)} loaded')
def _get_config(self, g_id: int) -> BaseSettings:
return self._config.get_configuration(f'{type(self).__name__}_{g_id}')
def _append_received_message_count(self, g_id: int):
try:
self._clients.append_received_message_count(self._bot.user.id, g_id, 1)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot edit client {self._bot.user.id}@{g_id}', e)
def _append_deleted_message_count(self, g_id: int):
try:
self._clients.append_received_message_count(self._bot.user.id, g_id, 1)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot edit client {self._bot.user.id}@{g_id}', e)
def _check_for_known_user(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Check if user is already known {member}')
try:
user = self._known_users.find_user_by_discord_id(member.id)
if user is not None:
return
self._logger.debug(__name__, f'Add user: {member.id}')
self._known_users.add_user(KnownUser(member.id))
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot get user {member.id}', e)
async def _add_if_not_exists_user(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Check if user exists {member}')
settings: BaseSettings = self._get_config(member.guild.id)
await self._messenger.send_dm_message(settings.welcome_message.format(member.guild.name), member)
for admin in self._permission_service.get_admins():
await self._messenger.send_dm_message(settings.welcome_message_for_team.format(member.name), admin)
for moderator in self._permission_service.get_moderators():
await self._messenger.send_dm_message(settings.welcome_message_for_team.format(member.name), moderator)
try:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.server_id)
if user is not None:
self._user_joins.add_user_joined_server(UserJoinedServer(user, datetime.now()))
return
self._logger.debug(__name__, f'Add user: {member.id}')
self._users.add_user(User(member.id, 0, server))
self._db.save_changes()
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.server_id)
self._user_joins.add_user_joined_server(UserJoinedServer(user, datetime.now()))
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot get user {member.id}', e)
async def _remove_user(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Remove user {member}')
settings: BaseSettings = self._get_config(member.guild.id)
await self._messenger.send_dm_message(settings.goodbye_message, member)
try:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.server_id)
if user is None:
self._logger.error(__name__, f'Cannot find user {member}')
return
join = self._user_joins.get_active_user_joined_server_by_user_id(user.user_id)
join.leaved_on = datetime.now()
self._user_joins.update_user_joined_server(join)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot get user {member.id}', e)
def _update_voice_state(self, joined: bool, dc_user_id: int, dc_channel_id: int, server: Server):
user: Optional[User] = None
try:
user = self._users.get_user_by_discord_id_and_server_id(dc_user_id, server.server_id)
except Exception as e:
self._logger.error(__name__, f'Cannot get user {dc_user_id}', e)
return
if user is None:
self._logger.error(__name__, f'User not found {dc_user_id}')
return
try:
if joined:
join = UserJoinedVoiceChannel(user, dc_channel_id, datetime.now())
self._user_joins_vc.add_user_joined_voice_channel(join)
self._db.save_changes()
return
settings: BaseSettings = self._get_config(server.discord_server_id)
join = self._user_joins_vc.get_active_user_joined_voice_channel_by_user_id(user.user_id)
join.leaved_on = datetime.now()
# ontime as hours
ontime = round((join.leaved_on - join.joined_on).total_seconds()/3600, 2)
old_xp = user.xp
user.xp += round(ontime * settings.xp_per_ontime_hour)
self._user_joins_vc.update_user_joined_voice_channel(join)
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(__name__, f'User {user} leaved_on {join.leaved_on}. Ontime: {ontime}h | xp: from {old_xp} to {user.xp}')
except Exception as e:
self._logger.error(__name__, f'Ontime validation failed', e)
def _handle_message_for_xp(self, message: discord.Message):
dc_user_id = message.author.id
try:
server = self._servers.get_server_by_discord_id(message.guild.id)
except Exception as e:
self._logger.error(__name__, f'Cannot get server {message.guild.id}', e)
return
user: Optional[User] = None
try:
user = self._users.get_user_by_discord_id_and_server_id(dc_user_id, server.server_id)
except Exception as e:
self._logger.error(__name__, f'Cannot get user {dc_user_id}', e)
return
if user is None:
self._logger.error(__name__, f'User not found {dc_user_id}')
return
settings: BaseSettings = self._get_config(message.guild.id)
old_xp = user.xp
user.xp += settings.xp_per_message
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(__name__, f'User {user} sent message. xp: from {old_xp} to {user.xp}')
async def on_member_join(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Module {type(self)} started')
self._check_for_known_user(member)
await self._add_if_not_exists_user(member)
async def on_member_remove(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Module {type(self)} started')
await self._remove_user(member)
async def on_message(self, message: discord.Message):
self._logger.debug(__name__, f'Module {type(self)} started')
if message is None or message.guild is None:
return
self._append_received_message_count(message.guild.id)
if not message.author.bot:
self._handle_message_for_xp(message)
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
self._logger.debug(__name__, f'Module {type(self)} started')
self._logger.trace(__name__, f'Detected on_voice_state_update {member.id} from {before} to {after}')
settings: BaseSettings = self._get_config(member.guild.id)
server = self._servers.get_server_by_discord_id(member.guild.id)
try:
# join
if before.channel is None and after.channel is not None and after.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} joined {after.channel}')
self._update_voice_state(True, member.id, after.channel.id, server)
# leave
elif before.channel is not None and after.channel is None and before.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} left {before.channel}')
self._update_voice_state(False, member.id, before.channel.id, server)
# channel to channel
elif before.channel is not None and after.channel is not None:
# joined
if before.channel.id in settings.afk_channel_ids and after.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} joined {after.channel}')
self._update_voice_state(True, member.id, after.channel.id, server)
# left
elif after.channel.id in settings.afk_channel_ids and before.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} left {before.channel}')
self._update_voice_state(False, member.id, before.channel.id, server)
else:
self._logger.trace(__name__, f'User {member.id} switched to {after.channel}')
except Exception as e:
self._logger.error(__name__, f'Cannot handle voice state for user {member.id}', e)