[WIP] Added basic commands & functionality of Gismo@0.4.10

This commit is contained in:
2022-07-16 23:23:07 +02:00
parent f506cb74bb
commit 9f8dd52c82
95 changed files with 4385 additions and 58 deletions

1
src/modules/__init__.py Normal file
View File

@@ -0,0 +1 @@
# imports

View File

@@ -0,0 +1 @@
# imports

View File

View File

@@ -0,0 +1,12 @@
from abc import abstractmethod, ABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class BaseHelperABC(ABC):
def __init__(self):
ABC.__init__(self)
@abstractmethod
def get_config(self, g_id: int) -> BaseServerSettings: pass

View File

@@ -0,0 +1,46 @@
{
"ProjectSettings": {
"Name": "base",
"Version": {
"Major": "0",
"Minor": "0",
"Micro": "0"
},
"Author": "",
"AuthorEmail": "",
"Description": "",
"LongDescription": "",
"URL": "",
"CopyrightDate": "",
"CopyrightName": "",
"LicenseName": "",
"LicenseDescription": "",
"Dependencies": [
"cpl-core>=2022.7.0.post2"
],
"DevDependencies": [
"cpl-cli>=2022.7.0.post2"
],
"PythonVersion": ">=3.10.4",
"PythonPath": {
"linux": ""
},
"Classifiers": []
},
"BuildSettings": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",
"Main": "base.main",
"EntryPoint": "base",
"IncludePackageData": false,
"Included": [],
"Excluded": [
"*/__pycache__",
"*/logs",
"*/tests"
],
"PackageData": {},
"ProjectReferences": []
}
}

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
"""
gismo sh-edraft Gismo
~~~~~~~~~~~~~~~~~~~
sh-edraft Dicord bot Gismo
:copyright: (c) 2021 - 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'modules.base.service'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2021 - 2022 sh-edraft.de'
__version__ = '0.4.2'
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='4', micro='2')

View File

@@ -0,0 +1,65 @@
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContext
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord import VoiceChannel
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_service_abc import ClientUtilsServiceABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.configuration.server_settings import ServerSettings
from bot_data.abc.client_repository_abc import ClientRepositoryABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class AFKCommand(DiscordCommandABC):
def __init__(
self,
logger: LoggerABC,
config: ConfigurationABC,
message_service: MessageServiceABC,
clients: ClientRepositoryABC,
db: DatabaseContext,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsServiceABC,
translate: TranslatePipe
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._config = config
self._message_service = message_service
self._clients = clients
self._db = db
self._bot = bot
self._client_utils = client_utils
self._t = translate
self._logger.trace(__name__, f'Loaded command service: {type(self).__name__}')
@commands.command()
async def afk(self, ctx: Context):
self._logger.debug(__name__, f'Received command afk {ctx}')
self._client_utils.received_command(ctx.guild.id)
settings: BaseServerSettings = self._config.get_configuration(f'BaseServerSettings_{ctx.guild.id}')
server_settings: ServerSettings = self._config.get_configuration(f'ServerSettings_{ctx.guild.id}')
if ctx.author.voice is None or ctx.author.voice.channel is None:
await self._message_service.send_ctx_msg(ctx, self._t.transform('modules.base.afk_command_channel_missing_message'))
self._logger.trace(__name__, f'Finished afk command')
return
self._bot.loop.create_task(self._message_service.send_ctx_msg(ctx, self._t.transform('modules.base.afk_command_move_message')))
channel: VoiceChannel = ctx.guild.get_channel(settings.afk_command_channel_id)
try:
await ctx.author.move_to(channel)
self._client_utils.moved_user(ctx.guild.id)
except Exception as e:
self._logger.error(__name__, f'Cannot move user {ctx.author.id} to channel {ctx.channel.id}', e)
await self._message_service.send_ctx_msg(ctx, self._t.transform('common.no_permission_message'))
self._logger.trace(__name__, f'Finished afk command')

View File

@@ -0,0 +1,40 @@
from cpl_core.configuration import ConfigurationABC
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_service_abc import ClientUtilsServiceABC
from bot_core.abc.message_service_abc import MessageServiceABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class HelpCommand(DiscordCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsServiceABC
):
DiscordCommandABC.__init__(self)
self._config = config
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._logger.trace(__name__, f'Loaded command service: {type(self).__name__}')
@commands.command()
async def help(self, ctx: Context, persistent_flag: str = None):
self._logger.debug(__name__, f'Received command help {ctx}:{persistent_flag}')
self._client_utils.received_command(ctx.guild.id)
settings: BaseServerSettings = self._config.get_configuration(f'BaseServerSettings_{ctx.guild.id}')
is_persistent = persistent_flag == '--stay'
await self._message_service.send_ctx_msg(ctx, settings.help_command_reference_url, is_persistent=is_persistent)
self._logger.trace(__name__, f'Finished help command')

View File

@@ -0,0 +1,94 @@
from datetime import datetime
from cpl_core.configuration import ConfigurationABC
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_query.extension import List
from discord.ext import commands
from discord.ext.commands import Context
import bot
from bot_core.abc.client_utils_service_abc import ClientUtilsServiceABC
from bot_core.abc.message_service_abc import MessageServiceABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class InfoCommandService(DiscordCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsServiceABC
):
DiscordCommandABC.__init__(self)
self._config = config
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._logger.trace(__name__, f'Loaded command service: {type(self).__name__}')
@commands.command()
async def info(self, ctx: Context):
#
# Todo: Use native embeds!!!
#
self._logger.debug(__name__, f'Received command info {ctx}')
self._client_utils.received_command(ctx.guild.id)
client = self._client_utils.get_client(self._bot.user.id, ctx.guild.id)
settings: BaseServerSettings = self._config.get_configuration(f'BaseServerSettings_{ctx.guild.id}')
embed_description = EmbedDescription(
settings.info_command_message.embed_description.title,
settings.info_command_message.embed_description.description,
settings.info_command_message.embed_description.url,
settings.info_command_message.embed_description.color,
List(EmbedDescriptionField),
settings.info_command_message.embed_description.footer
)
for i in range(len(settings.info_command_message.embed_description.fields)):
settings_field = settings.info_command_message.embed_description.fields[i]
field = EmbedDescriptionField(settings_field.name, settings_field.value, settings_field.inline)
if settings_field.value == '$version':
field.value = bot.__version__
elif settings_field.value == '$ontime':
start_time = self._config.get_configuration('Bot_StartTime')
ontime = round((datetime.now() - datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S.%f')).total_seconds()/3600, 2)
field.value = f'{ontime}h'
elif settings_field.value == '$sent_message_count':
field.value = client.sent_message_count
elif settings_field.value == '$received_message_count':
field.value = client.received_message_count
elif settings_field.value == '$deleted_message_count':
field.value = client.deleted_message_count
elif settings_field.value == '$received_command_count':
field.value = client.received_command_count
elif settings_field.value == '$moved_users_count':
field.value = client.moved_users_count
elif settings_field.value == '$modules':
field.value = ''
for module in ModuleABC.__subclasses__():
field.value += f'{module.__name__}\n'
embed_description.fields.append(field)
await self._message_service.send_ctx_msg(
ctx,
EmbedService.get_embed(embed_description)
)
self._logger.trace(__name__, f'Finished info command')

View File

@@ -0,0 +1,37 @@
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_service_abc import ClientUtilsServiceABC
from bot_core.abc.message_service_abc import MessageServiceABC
class PingCommand(DiscordCommandABC):
def __init__(
self,
logger: LoggerABC,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsServiceABC,
translate: TranslatePipe
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._t = translate
self._logger.trace(__name__, f'Loaded command service: {type(self).__name__}')
@commands.command()
async def ping(self, ctx: Context):
self._logger.debug(__name__, f'Received command ping {ctx}')
self._client_utils.received_command(ctx.guild.id)
await self._message_service.send_ctx_msg(ctx, self._t.transform('modules.base.pong'))
self._logger.trace(__name__, f'Finished ping command')

View File

@@ -0,0 +1,58 @@
import asyncio
from cpl_core.configuration import ConfigurationABC
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_translation import TranslatePipe
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_service_abc import ClientUtilsServiceABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.configuration.server_settings import ServerSettings
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class PurgeCommand(DiscordCommandABC):
def __init__(
self,
logger: LoggerABC,
config: ConfigurationABC,
message_service: MessageServiceABC,
permissions: PermissionServiceABC,
client_utils: ClientUtilsServiceABC,
translate: TranslatePipe
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._config = config
self._message_service = message_service
self._permissions = permissions
self._client_utils = client_utils
self._t = translate
self._logger.trace(__name__, f'Loaded command service: {type(self).__name__}')
@commands.command()
async def purge(self, ctx: Context):
self._logger.debug(__name__, f'Received command purge {ctx}')
self._client_utils.received_command(ctx.guild.id)
server_settings: ServerSettings = self._config.get_configuration(f'ServerSettings_{ctx.guild.id}')
if not self._permissions.is_member_moderator(ctx.author):
await self._message_service.send_ctx_msg(ctx, self._t.transform('common.no_permission_message'))
self._logger.trace(__name__, f'Finished purge command')
return
await self._message_service.send_ctx_msg(ctx, self._t.transform('modules.base.purge_message'))
await asyncio.sleep(server_settings.message_delete_timer)
try:
await ctx.channel.purge()
except Exception as e:
self._logger.error(__name__, f'Cannot purge channel {ctx.channel.id}', e)
await self._message_service.send_ctx_msg(ctx, self._t.transform('common.bot_has_no_permission_message'))
self._logger.trace(__name__, f'Finished purge command')

View File

@@ -0,0 +1,130 @@
from typing import Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_query.extension import List
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_service_abc import ClientUtilsServiceABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from modules.base.configuration.base_server_settings import BaseServerSettings
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class UserInfoCommandService(DiscordCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsServiceABC,
permissions: PermissionServiceABC,
servers: ServerRepositoryABC,
users: UserRepositoryABC,
user_joined_servers: UserJoinedServerRepositoryABC
):
DiscordCommandABC.__init__(self)
self._config = config
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._permissions = permissions
self._servers = servers
self._users = users
self._user_joined_servers = user_joined_servers
self._logger.trace(__name__, f'Loaded command service: {type(self).__name__}')
@commands.command(name='user-info')
async def user_info(self, ctx: Context, member: Optional[discord.Member] = None, *, wait: int = None):
#
# Todo: Use native embeds!!!
#
self._logger.debug(__name__, f'Received command user-info {ctx}:{member},{wait}')
self._client_utils.received_command(ctx.guild.id)
settings: BaseServerSettings = self._config.get_configuration(f'BaseServerSettings_{ctx.guild.id}')
if not self._permissions.is_member_moderator(ctx.author):
await self._message_service.send_ctx_msg(ctx, settings.no_permission_message)
self._logger.trace(__name__, f'Finished purge command')
return
if member is None or not isinstance(member, discord.Member):
member = ctx.author
server = self._servers.find_server_by_discord_id(ctx.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.server_id)
joins = self._user_joined_servers.get_user_joined_servers_by_user_id(user.user_id)
embed_description = EmbedDescription(
settings.user_info_command_message.embed_description.title.format(member.name),
settings.user_info_command_message.embed_description.description.format(member.name),
settings.user_info_command_message.embed_description.url,
settings.user_info_command_message.embed_description.color,
List(EmbedDescriptionField),
settings.user_info_command_message.embed_description.footer
)
for i in range(len(settings.user_info_command_message.embed_description.fields)):
settings_field = settings.user_info_command_message.embed_description.fields[i]
field = EmbedDescriptionField(settings_field.name, settings_field.value, settings_field.inline)
if settings_field.value == '$id':
field.value = settings_field.value.replace('$id', str(member.id))
elif settings_field.value == '$name':
field.value = settings_field.value.replace('$name', member.name)
elif settings_field.value == '$discord_join':
field.value = settings_field.value.replace('$discord_join', str(member.created_at))
elif settings_field.value == '$last_join':
field.value = settings_field.value.replace('$last_join', str(member.joined_at))
elif settings_field.value == '$xp':
field.value = settings_field.value.replace('$xp', str(user.xp))
elif settings_field.value == '$roles':
roles = ''
for role in member.roles:
roles += f'{role.name}\n'
field.value = settings_field.value.replace('$roles', roles)
elif settings_field.value == '$joins':
joins_string = ''
for join in joins:
joins_string += f'{join.joined_on}\n'
field.value = settings_field.value.replace('$joins', joins_string)
elif settings_field.value == '$leavings':
leavings_string = ''
for join in joins:
if join.leaved_on is None:
if leavings_string == '':
leavings_string = '/'
continue
leavings_string += f'{join.leaved_on}\n'
field.value = settings_field.value.replace('$leavings', leavings_string)
elif settings_field.value == '$warnings':
field.value = 'Not Implemented yet'
embed_description.fields.append(field)
await self._message_service.send_ctx_msg(
ctx,
EmbedService.get_embed(embed_description),
wait_before_delete=wait
)
self._logger.trace(__name__, f'Finished user-info command')

View File

@@ -0,0 +1,58 @@
import traceback
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
class BaseServerSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._id: int = 0
self._max_voice_state_hours: int = 0
self._xp_per_message: int = 0
self._xp_per_ontime_hour: int = 0
self._afk_channel_ids: list[int] = []
@property
def id(self) -> int:
return self._id
@property
def max_voice_state_hours(self) -> int:
return self._max_voice_state_hours
@property
def xp_per_message(self) -> int:
return self._xp_per_message
@property
def xp_per_ontime_hour(self) -> int:
return self._xp_per_ontime_hour
@property
def afk_channel_ids(self) -> list[int]:
return self._afk_channel_ids
@property
def afk_command_channel_id(self) -> int:
return self._afk_command_channel_id
@property
def help_command_reference_url(self) -> str:
return self._help_command_reference_url
def from_dict(self, settings: dict):
try:
self._id = int(settings['Id'])
self._max_voice_state_hours = int(settings['MaxVoiceStateHours'])
self._xp_per_message = int(settings['XpPerMessage'])
self._xp_per_ontime_hour = int(settings['XpPerOntimeHour'])
for index in settings['AFKChannelIds']:
self._afk_channel_ids.append(int(index))
self._afk_command_channel_id = settings['AFKCommandChannelId']
self._help_command_reference_url = settings['HelpCommandReferenceUrl']
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1,32 @@
import traceback
from cpl_core.configuration import ConfigurationModelABC
from cpl_core.console import Console
from cpl_query.extension import List
from modules.base.configuration.base_server_settings import BaseServerSettings
class BaseSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._servers: List[BaseServerSettings] = List()
@property
def servers(self) -> List[BaseServerSettings]:
return self._servers
def from_dict(self, settings: dict):
try:
servers = List(BaseServerSettings)
for s in settings:
st = BaseServerSettings()
settings[s]['Id'] = s
st.from_dict(settings[s])
servers.append(st)
self._servers = servers
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

View File

@@ -0,0 +1,97 @@
from datetime import datetime
from typing import Union
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnMemberJoinABC
from cpl_translation import TranslatePipe
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_data.abc.known_user_repository_abc import KnownUserRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.known_user import KnownUser
from bot_data.model.user import User
from bot_data.model.user_joined_server import UserJoinedServer
from modules.base.abc.base_helper_abc import BaseHelperABC
from modules.base.configuration.base_server_settings import BaseServerSettings
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class BaseOnMemberJoinEvent(OnMemberJoinABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
base_helper: BaseHelperABC,
messenger: MessageServiceABC,
permissions: PermissionServiceABC,
db: DatabaseContextABC,
known_users: KnownUserRepositoryABC,
users: UserRepositoryABC,
servers: ServerRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
translate: TranslatePipe
):
OnMemberJoinABC.__init__(self)
self._config = config
self._logger = logger
self._base_helper = base_helper
self._messenger = messenger
self._permission_service = permissions
self._db = db
self._known_users = known_users
self._users = users
self._servers = servers
self._user_joins = user_joins
self._t = translate
def _check_for_known_user(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Check if user is already known {member}')
try:
user = self._known_users.find_user_by_discord_id(member.id)
if user is not None:
return
self._logger.debug(__name__, f'Add user: {member.id}')
self._known_users.add_user(KnownUser(member.id))
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot get user {member.id}', e)
async def _add_if_not_exists_user_async(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Check if user exists {member}')
settings: BaseServerSettings = self._base_helper.get_config(member.guild.id)
await self._messenger.send_dm_message(self._t.transform('modules.base.welcome_message').format(member.guild.name), member)
for admin in self._permission_service.get_admins(member.guild.id):
await self._messenger.send_dm_message(self._t.transform('modules.base.welcome_message_for_team').format(member.name), admin)
for moderator in self._permission_service.get_moderators(member.guild.id):
await self._messenger.send_dm_message(self._t.transform('modules.base.welcome_message_for_team').format(member.name), moderator)
try:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.server_id)
if user is not None:
self._user_joins.add_user_joined_server(UserJoinedServer(user, datetime.now()))
return
self._logger.debug(__name__, f'Add user: {member.id}')
self._users.add_user(User(member.id, 0, server))
self._db.save_changes()
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.server_id)
self._user_joins.add_user_joined_server(UserJoinedServer(user, datetime.now()))
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot get user {member.id}', e)
async def on_member_join(self, member: discord.Member):
self._logger.debug(__name__, f'Module {type(self)} started')
self._check_for_known_user(member)
await self._add_if_not_exists_user_async(member)

View File

@@ -0,0 +1,64 @@
from datetime import datetime
from typing import Union
import discord
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnMemberRemoveABC
from cpl_translation import TranslatePipe
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from modules.base.abc.base_helper_abc import BaseHelperABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class BaseOnMemberRemoveEvent(OnMemberRemoveABC):
def __init__(
self,
logger: LoggerABC,
base_helper: BaseHelperABC,
db: DatabaseContextABC,
messenger: MessageServiceABC,
users: UserRepositoryABC,
servers: ServerRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
translate: TranslatePipe
):
OnMemberRemoveABC.__init__(self)
self._logger = logger
self._base_helper = base_helper
self._db = db
self._messenger = messenger
self._users = users
self._servers = servers
self._user_joins = user_joins
self._t = translate
async def _remove_user(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f'Remove user {member}')
settings: BaseServerSettings = self._base_helper.get_config(member.guild.id)
await self._messenger.send_dm_message(self._t.transform('modules.base.goodbye_message'), member)
try:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.server_id)
if user is None:
self._logger.error(__name__, f'Cannot find user {member}')
return
join = self._user_joins.get_active_user_joined_server_by_user_id(user.user_id)
join.leaved_on = datetime.now()
self._user_joins.update_user_joined_server(join)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot get user {member.id}', e)
async def on_member_remove(self, member: discord.Member):
self._logger.debug(__name__, f'Module {type(self)} started')
await self._remove_user(member)

View File

@@ -0,0 +1,79 @@
from typing import Optional
import discord
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnMessageABC
from cpl_discord.service import DiscordBotServiceABC
from bot_data.abc.client_repository_abc import ClientRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.user import User
from modules.base.abc.base_helper_abc import BaseHelperABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class BaseOnMessageEvent(OnMessageABC):
def __init__(
self,
logger: LoggerABC,
bhs: BaseHelperABC,
db: DatabaseContextABC,
bot: DiscordBotServiceABC,
users: UserRepositoryABC,
clients: ClientRepositoryABC,
servers: ServerRepositoryABC,
):
OnMessageABC.__init__(self)
self._logger = logger
self._base_helper = bhs
self._db = db
self._bot = bot
self._users = users
self._clients = clients
self._servers = servers
def _append_received_message_count(self, g_id: int):
try:
self._clients.append_received_message_count(self._bot.user.id, g_id, 1)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot edit client {self._bot.user.id}@{g_id}', e)
def _handle_message_for_xp(self, message: discord.Message):
dc_user_id = message.author.id
try:
server = self._servers.get_server_by_discord_id(message.guild.id)
except Exception as e:
self._logger.error(__name__, f'Cannot get server {message.guild.id}', e)
return
user: Optional[User] = None
try:
user = self._users.get_user_by_discord_id_and_server_id(dc_user_id, server.server_id)
except Exception as e:
self._logger.error(__name__, f'Cannot get user {dc_user_id}', e)
return
if user is None:
self._logger.error(__name__, f'User not found {dc_user_id}')
return
settings: BaseServerSettings = self._base_helper.get_config(message.guild.id)
old_xp = user.xp
user.xp += settings.xp_per_message
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(__name__, f'User {user} sent message. xp: from {old_xp} to {user.xp}')
async def on_message(self, message: discord.Message):
self._logger.debug(__name__, f'Module {type(self)} started')
if message is None or message.guild is None:
return
self._append_received_message_count(message.guild.id)
if not message.author.bot:
self._handle_message_for_xp(message)

View File

@@ -0,0 +1,118 @@
from datetime import datetime
from typing import Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnVoiceStateUpdateABC
from bot_data.abc.known_user_repository_abc import KnownUserRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_joined_voice_channel_abc import UserJoinedVoiceChannelRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server import Server
from bot_data.model.user import User
from bot_data.model.user_joined_voice_channel import UserJoinedVoiceChannel
from modules.base.abc.base_helper_abc import BaseHelperABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class BaseOnVoiceStateUpdateEvent(OnVoiceStateUpdateABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
base_helper: BaseHelperABC,
servers: ServerRepositoryABC,
known_users: KnownUserRepositoryABC,
users: UserRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
user_joins_vc: UserJoinedVoiceChannelRepositoryABC,
db: DatabaseContextABC,
):
OnVoiceStateUpdateABC.__init__(self)
self._config = config
self._logger = logger
self._base_helper = base_helper
self._servers = servers
self._known_users = known_users
self._users = users
self._user_joins = user_joins
self._user_joins_vc = user_joins_vc
self._db = db
self._logger.info(__name__, f'Module {type(self)} loaded')
def _update_voice_state(self, joined: bool, dc_user_id: int, dc_channel_id: int, server: Server):
user: Optional[User] = None
try:
user = self._users.get_user_by_discord_id_and_server_id(dc_user_id, server.server_id)
except Exception as e:
self._logger.error(__name__, f'Cannot get user {dc_user_id}', e)
return
if user is None:
self._logger.error(__name__, f'User not found {dc_user_id}')
return
try:
if joined:
join = UserJoinedVoiceChannel(user, dc_channel_id, datetime.now())
self._user_joins_vc.add_user_joined_voice_channel(join)
self._db.save_changes()
return
settings: BaseServerSettings = self._get_config(server.discord_server_id)
join = self._user_joins_vc.get_active_user_joined_voice_channel_by_user_id(user.user_id)
join.leaved_on = datetime.now()
# ontime as hours
ontime = round((join.leaved_on - join.joined_on).total_seconds() / 3600, 2)
old_xp = user.xp
user.xp += round(ontime * settings.xp_per_ontime_hour)
self._user_joins_vc.update_user_joined_voice_channel(join)
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(__name__, f'User {user} leaved_on {join.leaved_on}. Ontime: {ontime}h | xp: from {old_xp} to {user.xp}')
except Exception as e:
self._logger.error(__name__, f'Ontime validation failed', e)
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
self._logger.debug(__name__, f'Module {type(self)} started')
self._logger.trace(__name__, f'Detected on_voice_state_update {member.id} from {before} to {after}')
settings: BaseServerSettings = self._base_helper.get_config(member.guild.id)
server = self._servers.get_server_by_discord_id(member.guild.id)
try:
# join
if before.channel is None and after.channel is not None and after.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} joined {after.channel}')
self._update_voice_state(True, member.id, after.channel.id, server)
# leave
elif before.channel is not None and after.channel is None and before.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} left {before.channel}')
self._update_voice_state(False, member.id, before.channel.id, server)
# channel to channel
elif before.channel is not None and after.channel is not None:
# joined
if before.channel.id in settings.afk_channel_ids and after.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} joined {after.channel}')
self._update_voice_state(True, member.id, after.channel.id, server)
# left
elif after.channel.id in settings.afk_channel_ids and before.channel.id not in settings.afk_channel_ids:
self._logger.trace(__name__, f'User {member.id} left {before.channel}')
self._update_voice_state(False, member.id, before.channel.id, server)
else:
self._logger.trace(__name__, f'User {member.id} switched to {after.channel}')
except Exception as e:
self._logger.error(__name__, f'Cannot handle voice state for user {member.id}', e)

View File

@@ -0,0 +1 @@
# imports

View File

@@ -0,0 +1,14 @@
from cpl_core.configuration import ConfigurationABC
from modules.base.abc.base_helper_abc import BaseHelperABC
from modules.base.configuration.base_server_settings import BaseServerSettings
class BaseHelperService(BaseHelperABC):
def __init__(self, config: ConfigurationABC):
BaseHelperABC.__init__(self)
self._config = config
def get_config(self, g_id: int) -> BaseServerSettings:
return self._config.get_configuration(f'BaseServerSettings_{g_id}')

View File

@@ -0,0 +1 @@
# imports:

View File

@@ -0,0 +1,46 @@
{
"ProjectSettings": {
"Name": "boot-log",
"Version": {
"Major": "0",
"Minor": "0",
"Micro": "0"
},
"Author": "",
"AuthorEmail": "",
"Description": "",
"LongDescription": "",
"URL": "",
"CopyrightDate": "",
"CopyrightName": "",
"LicenseName": "",
"LicenseDescription": "",
"Dependencies": [
"cpl-core>=2022.7.0.post2"
],
"DevDependencies": [
"cpl-cli>=2022.7.0.post2"
],
"PythonVersion": ">=3.10.4",
"PythonPath": {
"linux": ""
},
"Classifiers": []
},
"BuildSettings": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",
"Main": "boot_log.main",
"EntryPoint": "boot-log",
"IncludePackageData": false,
"Included": [],
"Excluded": [
"*/__pycache__",
"*/logs",
"*/tests"
],
"PackageData": {},
"ProjectReferences": []
}
}

View File

@@ -0,0 +1,17 @@
from datetime import datetime
from cpl_core.application.application_extension_abc import ApplicationExtensionABC
from cpl_core.configuration import ConfigurationABC
from cpl_core.dependency_injection import ServiceProviderABC
from cpl_core.logging import LoggerABC
class BootLogExtension(ApplicationExtensionABC):
def __init__(self):
pass
async def run(self, config: ConfigurationABC, services: ServiceProviderABC):
logger: LoggerABC = services.get_service(LoggerABC)
logger.debug(__name__, 'BootLog extension started')
config.add_configuration('Bot_StartTime', str(datetime.now()))

View File

@@ -0,0 +1,77 @@
from datetime import datetime
from cpl_core.configuration import ConfigurationABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnReadyABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord import guild
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.configuration.server_settings import ServerSettings
from modules.boot_log.configuration.boot_log_server_settings import BootLogServerSettings
class BootLogOnReadyEvent(OnReadyABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
bot: DiscordBotServiceABC,
message_service: MessageServiceABC,
translate: TranslatePipe
):
OnReadyABC.__init__(self)
self._config = config
self._logger = logger
self._bot = bot
self._message_service = message_service
self._t = translate
self._logger.info(__name__, f'Module {type(self)} loaded')
async def on_ready(self):
self._logger.debug(__name__, f'Module {type(self)} started')
try:
start_time = self._config.get_configuration('Bot_StartTime')
init_time = round((datetime.now() - datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S.%f')).total_seconds(), 2)
self._config.add_configuration('InitTime', str(init_time))
self._logger.debug(__name__, f'Bot Init time: {init_time}s')
# print warning if initialisation took too long
if init_time >= 30:
self._logger.warn(
__name__, 'It takes long time to start the bot!')
# print error if initialisation took way too long
elif init_time >= 90:
self._logger.error(
__name__, 'It takes very long time to start the bot!!!')
except Exception as e:
self._logger.error(__name__, 'Init time calculation failed', e)
return
for g in self._bot.guilds:
g: guild = g
self._logger.debug(__name__, f'Server detected: {g.id}')
server_settings: ServerSettings = self._config.get_configuration(f'ServerSettings_{g.id}')
if server_settings is None:
self._logger.error(__name__, f'BootLog settings for server {g.id} not found!')
return
module_settings: BootLogServerSettings = self._config.get_configuration(f'BootLogServerSettings_{g.id}')
if module_settings is None:
self._logger.error(__name__, f'Config {type(self).__name__}_{g.id} not found!')
return
await self._message_service.send_channel_message(
self._bot.get_channel(
module_settings.login_message_channel_id
),
self._t.transform('modules.boot_log.login_message').format(init_time)
)
self._logger.info(__name__, 'Bot is ready')
self._logger.trace(__name__, f'Module {type(self)} stopped')

View File

@@ -0,0 +1,29 @@
import traceback
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
class BootLogServerSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._id: int = 0
self._login_message_channel_id: int = 0
@property
def id(self) -> int:
return self._id
@property
def login_message_channel_id(self) -> int:
return self._login_message_channel_id
def from_dict(self, settings: dict):
try:
self._id = int(settings['Id'])
self._login_message_channel_id = int(settings['LoginMessageChannelId'])
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1,32 @@
import traceback
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
from cpl_query.extension import List
from modules.boot_log.configuration.boot_log_server_settings import BootLogServerSettings
class BootLogSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._servers: List[BootLogServerSettings] = List()
@property
def servers(self) -> List[BootLogServerSettings]:
return self._servers
def from_dict(self, settings: dict):
try:
servers = List(BootLogServerSettings)
for s in settings:
st = BootLogServerSettings()
settings[s]['Id'] = s
st.from_dict(settings[s])
servers.append(st)
self._servers = servers
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1 @@
# imports:

View File

@@ -0,0 +1,46 @@
{
"ProjectSettings": {
"Name": "database",
"Version": {
"Major": "1",
"Minor": "0",
"Micro": "0.dev1"
},
"Author": "Sven Heidemann",
"AuthorEmail": "sven.heidemann@sh-edraft.de",
"Description": "Keksdose bot - db module",
"LongDescription": "Discord bot for the Keksdose discord Server - database module",
"URL": "https://www.sh-edraft.de",
"CopyrightDate": "2022",
"CopyrightName": "sh-edraft.de",
"LicenseName": "MIT",
"LicenseDescription": "MIT, see LICENSE for more details.",
"Dependencies": [
"cpl-core>=2022.7.0.post2"
],
"DevDependencies": [
"cpl-cli>=2022.7.0.post2"
],
"PythonVersion": ">=3.10.4",
"PythonPath": {
"linux": ""
},
"Classifiers": []
},
"BuildSettings": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",
"Main": "database.main",
"EntryPoint": "database",
"IncludePackageData": false,
"Included": [],
"Excluded": [
"*/__pycache__",
"*/logs",
"*/tests"
],
"PackageData": {},
"ProjectReferences": []
}
}

View File

@@ -0,0 +1,21 @@
from datetime import datetime
from cpl_core.application.application_extension_abc import ApplicationExtensionABC
from cpl_core.configuration import ConfigurationABC
from cpl_core.dependency_injection import ServiceProviderABC
from cpl_core.logging import LoggerABC
from bot_data.service.migration_service import MigrationService
class DatabaseExtension(ApplicationExtensionABC):
def __init__(self):
pass
async def run(self, config: ConfigurationABC, services: ServiceProviderABC):
logger: LoggerABC = services.get_service(LoggerABC)
logger.debug(__name__, 'Database extension started')
config.add_configuration('Database_StartTime', str(datetime.now()))
migrations: MigrationService = services.get_service(MigrationService)
migrations.migrate()

View File

@@ -0,0 +1,310 @@
from ctypes import Union
from datetime import datetime, timedelta
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnReadyABC
from cpl_discord.service import DiscordBotServiceABC
from bot_data.abc.client_repository_abc import ClientRepositoryABC
from bot_data.abc.known_user_repository_abc import KnownUserRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_joined_voice_channel_abc import UserJoinedVoiceChannelRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.client import Client
from bot_data.model.known_user import KnownUser
from bot_data.model.server import Server
from bot_data.model.user import User
from bot_data.model.user_joined_server import UserJoinedServer
from bot_data.model.user_joined_voice_channel import UserJoinedVoiceChannel
from bot_data.service.user_repository_service import ServerRepositoryABC
class DatabaseOnReadyEvent(OnReadyABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
bot: DiscordBotServiceABC,
db_context: DatabaseContextABC,
server_repo: ServerRepositoryABC,
user_repo: UserRepositoryABC,
client_repo: ClientRepositoryABC,
known_users: KnownUserRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
user_joins_vc: UserJoinedVoiceChannelRepositoryABC
):
self._config = config
self._logger = logger
self._bot = bot
self._db_context = db_context
self._servers = server_repo
self._users = user_repo
self._clients = client_repo
self._known_users = known_users
self._user_joins = user_joins
self._user_joins_vc = user_joins_vc
OnReadyABC.__init__(self)
self._logger.info(__name__, f'Module {type(self)} loaded')
def _validate_init_time(self):
try:
start_time = self._config.get_configuration('Database_StartTime')
init_time = round((datetime.now() - datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S.%f')).total_seconds(), 2)
self._config.add_configuration('Database_InitTime', str(init_time))
self._logger.debug(__name__, f'Database Init time: {init_time}s')
# print warning if initialisation took too long
if init_time >= 30:
self._logger.warn(
__name__, 'It takes long time to start the bot!')
# print error if initialisation took way too long
elif init_time >= 90:
self._logger.error(
__name__, 'It takes very long time to start the bot!!!')
except Exception as e: #
self._logger.error(__name__, 'Database init time calculation failed', e)
return
def _check_known_users(self):
self._logger.debug(__name__, f'Start checking KnownUsers table, {len(self._bot.users)}')
for u in self._bot.users:
u: discord.User = u
try:
if u.bot:
self._logger.trace(__name__, f'User {u.id} is ignored, because its a bot')
continue
user = self._known_users.find_user_by_discord_id(u.id)
if user is not None:
continue
self._logger.warn(__name__, f'Unknown user: {u.id}')
self._logger.debug(__name__, f'Add user: {u.id}')
self._known_users.add_user(KnownUser(u.id))
self._db_context.save_changes()
user = self._known_users.find_user_by_discord_id(u.id)
if user is None:
self._logger.fatal(__name__, f'Cannot add user: {u.id}')
self._logger.debug(__name__, f'Added user: {u.id}')
except Exception as e:
self._logger.error(__name__, f'Cannot get user', e)
def _check_servers(self):
self._logger.debug(__name__, f'Start checking Servers table')
for g in self._bot.guilds:
g: discord.Guild = g
try:
server = self._servers.find_server_by_discord_id(g.id)
if server is not None:
continue
self._logger.warn(__name__, f'Server not found in database: {g.id}')
self._logger.debug(__name__, f'Add server: {g.id}')
self._servers.add_server(Server(g.id))
self._db_context.save_changes()
server = self._servers.find_server_by_discord_id(g.id)
if server is None:
self._logger.fatal(__name__, f'Cannot add server: {g.id}')
self._logger.debug(__name__, f'Added server: {g.id}')
except Exception as e:
self._logger.error(__name__, f'Cannot get server', e)
results = self._servers.get_servers()
if results is None or len(results) == 0:
self._logger.error(__name__, f'Table Servers is empty!')
def _check_clients(self):
self._logger.debug(__name__, f'Start checking Clients table')
for g in self._bot.guilds:
g: discord.Guild = g
try:
server: Server = self._servers.find_server_by_discord_id(g.id)
if server is None:
self._logger.fatal(__name__, f'Server not found in database: {g.id}')
client = self._clients.find_client_by_server_id(server.server_id)
if client is not None:
continue
self._logger.warn(__name__, f'Client for server {g.id} not found in database: {self._bot.user.id}')
self._logger.debug(__name__, f'Add client: {self._bot.user.id}')
self._clients.add_client(Client(self._bot.user.id, 0, 0, 0, 0, 0, server))
self._db_context.save_changes()
client = self._clients.find_client_by_server_id(server.server_id)
if client is None:
self._logger.fatal(__name__, f'Cannot add client {self._bot.user.id} for server {g.id}')
self._logger.debug(__name__, f'Added client: {g.id}')
except Exception as e:
self._logger.error(__name__, f'Cannot get client', e)
results = self._servers.get_servers()
if results is None or len(results) == 0:
self._logger.error(__name__, f'Table Servers is empty!')
def _check_users(self):
self._logger.debug(__name__, f'Start checking Users table')
for g in self._bot.guilds:
g: discord.Guild = g
try:
server = self._servers.find_server_by_discord_id(g.id)
if server is None:
self._logger.fatal(__name__, f'Server not found in database: {g.id}')
for u in g.members:
u: Union[discord.Member, discord.User] = u
if u.bot:
self._logger.trace(__name__, f'User {u.id} is ignored, because its a bot')
continue
user = self._users.find_user_by_discord_id_and_server_id(u.id, server.server_id)
if user is not None:
continue
self._logger.warn(__name__, f'User not found in database: {u.id}')
self._logger.debug(__name__, f'Add user: {u.id}')
self._users.add_user(User(u.id, 0, server))
self._db_context.save_changes()
self._logger.debug(__name__, f'Added User: {u.id}')
except Exception as e:
self._logger.error(__name__, f'Cannot get User', e)
results = self._users.get_users()
if results is None or len(results) == 0:
self._logger.error(__name__, f'Table Users is empty!')
def _check_user_joins(self):
self._logger.debug(__name__, f'Start checking UserJoinedServers table')
for guild in self._bot.guilds:
guild: discord.Guild = guild
server = self._servers.find_server_by_discord_id(guild.id)
if server is None:
self._logger.fatal(__name__, f'Server not found in database: {guild.id}')
try:
for u in guild.members:
u: discord.User = u
if u.bot:
self._logger.trace(__name__, f'User {u.id} is ignored, because its a bot')
continue
user = self._users.find_user_by_discord_id_and_server_id(u.id, server.server_id)
if user is None:
self._logger.fatal(__name__, f'User not found in database: {u.id}')
join = self._user_joins.find_active_user_joined_server_by_user_id(user.user_id)
if join is not None:
continue
m: discord.Member = u
self._logger.warn(__name__, f'Active UserJoinedServer not found in database: {guild.id}:{u.id}@{m.joined_at}')
self._logger.debug(__name__, f'Add UserJoinedServer: {guild.id}:{u.id}@{m.joined_at}')
self._user_joins.add_user_joined_server(UserJoinedServer(user, m.joined_at, None))
self._db_context.save_changes()
self._logger.debug(__name__, f'Added UserJoinedServer: {u.id}')
except Exception as e:
self._logger.error(__name__, f'Cannot get UserJoinedServer', e)
results = self._users.get_users()
if results is None or len(results) == 0:
self._logger.error(__name__, f'Table Users is empty!')
joins = self._user_joins.get_user_joined_servers()
for join in joins:
join: UserJoinedServer = join
if join.user.server.discord_server_id != guild.id:
continue
if join.leaved_on is not None:
continue
dc_user = guild.get_member(join.user.discord_id)
if dc_user is None:
self._logger.warn(__name__, f'User {join.user.discord_id} already left the server.')
join.leaved_on = datetime.now()
self._user_joins.update_user_joined_server(join)
self._db_context.save_changes()
def _check_user_joins_vc(self):
self._logger.debug(__name__, f'Start checking UserJoinedVoiceChannel table')
for guild in self._bot.guilds:
guild: discord.Guild = guild
server = self._servers.find_server_by_discord_id(guild.id)
if server is None:
self._logger.fatal(__name__, f'Server not found in database: {guild.id}')
try:
for member in guild.members:
if member.bot:
self._logger.trace(__name__, f'User {member.id} is ignored, because its a bot')
continue
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.server_id)
if user is None:
self._logger.fatal(__name__, f'User not found in database: {member.id}')
joins = self._user_joins_vc.find_active_user_joined_voice_channels_by_user_id(user.user_id)
if joins is None or len(joins) == 0:
continue
for join in joins:
self._logger.warn(__name__, f'Active UserJoinedVoiceChannel found in database: {guild.id}:{member.id}@{join.joined_on}')
join.leaved_on = datetime.now()
settings: BaseSettings = self._config.get_configuration(f'BaseServerSettings_{guild.id}')
if ((join.leaved_on - join.joined_on).total_seconds() / 60 / 60) > settings.max_voice_state_hours:
join.leaved_on = join.joined_on + timedelta(hours=settings.max_voice_state_hours)
self._user_joins_vc.update_user_joined_voice_channel(join)
# todo: maybe add XP
self._db_context.save_changes()
for member in guild.members:
if member.bot:
self._logger.trace(__name__, f'User {member.id} is ignored, because its a bot')
continue
if member.voice is None:
continue
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.server_id)
if user is None:
self._logger.fatal(__name__, f'User not found in database: {member.id}')
join = UserJoinedVoiceChannel(user, member.voice.channel.id, datetime.now())
self._user_joins_vc.add_user_joined_voice_channel(join)
self._db_context.save_changes()
self._logger.warn(__name__, f'VS {member.voice}')
except Exception as e:
self._logger.error(__name__, f'Cannot get UserJoinedVoiceChannel', e)
async def on_ready(self):
self._logger.debug(__name__, f'Module {type(self)} started')
self._check_known_users()
self._check_servers()
self._check_clients()
self._check_users()
self._check_user_joins()
self._check_user_joins_vc()
self._validate_init_time()
self._logger.trace(__name__, f'Module {type(self)} stopped')

View File

@@ -0,0 +1 @@
# imports:

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
"""
gismo sh-edraft Gismo
~~~~~~~~~~~~~~~~~~~
sh-edraft Dicord bot Gismo
:copyright: (c) 2021 - 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'modules.permission.abc'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2021 - 2022 sh-edraft.de'
__version__ = '0.4.2'
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='4', micro='2')

View File

@@ -0,0 +1,39 @@
from abc import ABC, abstractmethod
import discord
class PermissionServiceABC(ABC):
@abstractmethod
def __init__(self): pass
@abstractmethod
def on_ready(self): pass
@abstractmethod
def on_member_update(self, before: discord.Member, after: discord.Member): pass
@abstractmethod
def get_admin_role_ids(self, g_id: int) -> list[int]: pass
@abstractmethod
def get_admin_roles(self, g_id: int) -> list[discord.Role]: pass
@abstractmethod
def get_admins(self, g_id: int) -> list[discord.Member]: pass
@abstractmethod
def get_moderator_role_ids(self, g_id: int) -> list[int]: pass
@abstractmethod
def get_moderator_roles(self, g_id: int) -> list[discord.Role]: pass
@abstractmethod
def get_moderators(self, g_id: int) -> list[discord.Member]: pass
@abstractmethod
def is_member_admin(self, member: discord.Member) -> bool: pass
@abstractmethod
def is_member_moderator(self, member: discord.Member) -> bool: pass

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
"""
gismo sh-edraft Gismo
~~~~~~~~~~~~~~~~~~~
sh-edraft Dicord bot Gismo
:copyright: (c) 2021 - 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'modules.permission.configuration'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2021 - 2022 sh-edraft.de'
__version__ = '0.4.2'
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='4', micro='2')

View File

@@ -0,0 +1,38 @@
import traceback
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
class PermissionServerSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._id: int = 0
self._admin_roles: list[int] = []
self._moderator_roles: list[int] = []
@property
def id(self) -> int:
return self._id
@property
def admin_roles(self) -> list[int]:
return self._admin_roles
@property
def moderator_roles(self) -> list[int]:
return self._moderator_roles
def from_dict(self, settings: dict):
try:
self._id = int(settings['Id'])
for index in settings['AdminRoleIds']:
self._admin_roles.append(int(index))
for index in settings['ModeratorRoleIds']:
self._moderator_roles.append(int(index))
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {self.__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1,32 @@
import traceback
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
from cpl_query.extension import List
from modules.permission.configuration.permission_server_settings import PermissionServerSettings
class PermissionSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._servers: List[PermissionServerSettings] = List()
@property
def servers(self) -> List[PermissionServerSettings]:
return self._servers
def from_dict(self, settings: dict):
try:
servers = List(PermissionServerSettings)
for s in settings:
st = PermissionServerSettings()
settings[s]['Id'] = s
st.from_dict(settings[s])
servers.append(st)
self._servers = servers
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1,19 @@
import discord
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnMemberUpdateABC
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class PermissionOnMemberUpdateEvent(OnMemberUpdateABC):
def __init__(self, logger: LoggerABC, permission_service: PermissionServiceABC):
OnMemberUpdateABC.__init__(self)
self._logger = logger
self._permission_service = permission_service
async def on_member_update(self, before: discord.Member, after: discord.Member):
self._logger.debug(__name__, f'Module {type(self)} started')
if before.roles != after.roles:
self._permission_service.on_member_update(before, after)

View File

@@ -0,0 +1,16 @@
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnReadyABC
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class PermissionOnReadyEvent(OnReadyABC):
def __init__(self, logger: LoggerABC, permission_service: PermissionServiceABC):
OnReadyABC.__init__(self)
self._logger = logger
self._permission_service = permission_service
async def on_ready(self):
self._logger.debug(__name__, f'Module {type(self)} started')
self._permission_service.on_ready()

View File

@@ -0,0 +1,46 @@
{
"ProjectSettings": {
"Name": "permission",
"Version": {
"Major": "0",
"Minor": "0",
"Micro": "0"
},
"Author": "",
"AuthorEmail": "",
"Description": "",
"LongDescription": "",
"URL": "",
"CopyrightDate": "",
"CopyrightName": "",
"LicenseName": "",
"LicenseDescription": "",
"Dependencies": [
"cpl-core>=2022.7.0.post2"
],
"DevDependencies": [
"cpl-cli>=2022.7.0.post2"
],
"PythonVersion": ">=3.10.4",
"PythonPath": {
"linux": ""
},
"Classifiers": []
},
"BuildSettings": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",
"Main": "permission.main",
"EntryPoint": "permission",
"IncludePackageData": false,
"Included": [],
"Excluded": [
"*/__pycache__",
"*/logs",
"*/tests"
],
"PackageData": {},
"ProjectReferences": []
}
}

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
"""
gismo sh-edraft Gismo
~~~~~~~~~~~~~~~~~~~
sh-edraft Dicord bot Gismo
:copyright: (c) 2021 - 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'modules.permission.service'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2021 - 2022 sh-edraft.de'
__version__ = '0.4.2'
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='4', micro='2')

View File

@@ -0,0 +1,126 @@
import discord
from cpl_core.logging import LoggerABC
from cpl_core.configuration import ConfigurationABC
from cpl_discord.service import DiscordBotServiceABC
from modules.permission.abc.permission_service_abc import PermissionServiceABC
from modules.permission.configuration.permission_server_settings import PermissionServerSettings
class PermissionService(PermissionServiceABC):
def __init__(self, logger: LoggerABC, bot: DiscordBotServiceABC, config: ConfigurationABC):
PermissionServiceABC.__init__(self)
self._logger = logger
self._bot = bot
self._config = config
self._admin_role_ids: dict[int, list[int]] = {}
self._admin_roles: dict[int, list[discord.Role]] = {}
self._admins: dict[int, list[discord.Member]] = {}
self._moderator_role_ids: dict[int, list[int]] = {}
self._moderator_roles: dict[int, list[discord.Role]] = {}
self._moderators: dict[int, list[discord.Member]] = {}
def on_ready(self):
for guild in self._bot.guilds:
guild: discord.Guild = guild
self._logger.debug(__name__, f'Validate permission settings')
settings: PermissionServerSettings = self._config.get_configuration(f'PermissionServerSettings_{guild.id}')
if settings is None:
self._logger.error(__name__, 'Permission settings not found')
return
self._admin_role_ids[guild.id] = settings.admin_roles
self._moderator_role_ids[guild.id] = settings.moderator_roles
admin_roles = []
admins = []
mod_roles = []
mods = []
for role in guild.roles:
role: discord.Role = role
if role.id in self._admin_role_ids[guild.id]:
admin_roles.append(role)
self._logger.trace(__name__, f'Added admin role {role}')
for member in role.members:
admins.append(member)
self._logger.trace(__name__, f'Added admin {member}')
if role.id in self._moderator_role_ids[guild.id]:
mod_roles.append(role)
self._logger.trace(__name__, f'Added moderator role {role}')
for member in role.members:
mods.append(member)
self._logger.trace(__name__, f'Added moderator {member}')
self._admin_roles[guild.id] = admin_roles
self._admins[guild.id] = admins
self._moderator_roles[guild.id] = mod_roles
self._moderators[guild.id] = mods
self._logger.error(__name__, f'USERS {self._admins} {self._moderators}')
def on_member_update(self, before: discord.Member, after: discord.Member):
g_id = after.guild.id
for admin_role in self._admin_roles[g_id]:
if admin_role in before.roles and admin_role not in after.roles:
self._admins[g_id].remove(after)
self._logger.trace(__name__, f'Removed {after.id} from admins')
elif admin_role in after.roles and admin_role not in before.roles:
self._admins[g_id].append(after)
self._logger.trace(__name__, f'Added {after.id} to admins')
for moderator_role in self._moderator_roles[g_id]:
if moderator_role in before.roles and moderator_role not in after.roles:
self._moderators[g_id].remove(after)
self._logger.trace(__name__, f'Removed {after.id} from moderators')
elif moderator_role in after.roles and moderator_role not in before.roles:
self._moderators[g_id].append(after)
self._logger.trace(__name__, f'Added {after.id} to moderators')
def get_admin_role_ids(self, g_id: int) -> list[int]:
return self._admin_role_ids[g_id]
def get_admin_roles(self, g_id: int) -> list[discord.Role]:
return self._admin_roles[g_id]
def get_admins(self, g_id: int) -> list[discord.Member]:
return self._admins[g_id]
def get_moderator_role_ids(self, g_id: int) -> list[int]:
return self._moderator_role_ids[g_id]
def get_moderator_roles(self, g_id: int) -> list[discord.Role]:
return self._moderator_roles[g_id]
def get_moderators(self, g_id: int) -> list[discord.Member]:
return self._moderators[g_id]
def is_member_admin(self, member: discord.Member) -> bool:
role_match = False
for role in member.roles:
if role in self._admin_roles:
role_match = True
return member in self._admins[member.guild.id] or role_match
def is_member_moderator(self, member: discord.Member) -> bool:
role_match = False
for role in member.roles:
if role in self._moderator_roles:
role_match = True
return member in self._moderators[member.guild.id] or role_match or self.is_member_admin(member)