Moved folders #405

This commit is contained in:
2023-10-13 17:10:00 +02:00
parent eb32bec43c
commit f435d3dd48
807 changed files with 3801 additions and 1297 deletions

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")

View File

@@ -0,0 +1,44 @@
{
"ProjectSettings": {
"Name": "base",
"Version": {
"Major": "1",
"Minor": "2",
"Micro": "0"
},
"Author": "",
"AuthorEmail": "",
"Description": "",
"LongDescription": "",
"URL": "",
"CopyrightDate": "",
"CopyrightName": "",
"LicenseName": "",
"LicenseDescription": "",
"Dependencies": [
"cpl-core==2022.12.0"
],
"DevDependencies": [
"cpl-cli==2022.12.0"
],
"PythonVersion": ">=3.10.4",
"PythonPath": {},
"Classifiers": []
},
"BuildSettings": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",
"Main": "base.main",
"EntryPoint": "base",
"IncludePackageData": false,
"Included": [],
"Excluded": [
"*/__pycache__",
"*/logs",
"*/tests"
],
"PackageData": {},
"ProjectReferences": []
}
}

View File

@@ -0,0 +1,133 @@
from cpl_core.configuration import ConfigurationABC
from cpl_core.dependency_injection import ServiceCollectionABC
from cpl_core.environment import ApplicationEnvironmentABC
from cpl_discord.discord_event_types_enum import DiscordEventTypesEnum
from cpl_discord.service.discord_collection_abc import DiscordCollectionABC
from bot_core.abc.module_abc import ModuleABC
from bot_core.abc.task_abc import TaskABC
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
from modules.base.birthday_watcher import BirthdayWatcher
from modules.base.command.afk_command import AFKCommand
from modules.base.command.game_server_group import GameServerGroup
from modules.base.command.help_command import HelpCommand
from modules.base.command.info_command import InfoCommand
from modules.base.command.make_coffee_command import MakeCoffeeCommand
from modules.base.command.mass_move_command import MassMoveCommand
from modules.base.command.ping_command import PingCommand
from modules.base.command.presence_command import PresenceCommand
from modules.base.command.purge_command import PurgeCommand
from modules.base.command.register_group import RegisterGroup
from modules.base.command.submit_group import SubmitGroup
from modules.base.command.unregister_group import UnregisterGroup
from modules.base.command.user_group import UserGroup
from modules.base.events.base_on_command_error_event import BaseOnCommandErrorEvent
from modules.base.events.base_on_command_event import BaseOnCommandEvent
from modules.base.events.base_on_guild_join_event import BaseOnGuildJoinEvent
from modules.base.events.base_on_member_join_event import BaseOnMemberJoinEvent
from modules.base.events.base_on_member_remove_event import BaseOnMemberRemoveEvent
from modules.base.events.base_on_message_delete_event import BaseOnMessageDeleteEvent
from modules.base.events.base_on_message_event import BaseOnMessageEvent
from modules.base.events.base_on_raw_reaction_add import BaseOnRawReactionAddEvent
from modules.base.events.base_on_raw_reaction_remove import BaseOnRawReactionRemoveEvent
from modules.base.events.base_on_scheduled_event_update_event import (
BaseOnScheduledEventUpdateEvent,
)
from modules.base.events.base_on_voice_state_update_event import (
BaseOnVoiceStateUpdateEvent,
)
from modules.base.events.base_on_voice_state_update_event_help_channel import (
BaseOnVoiceStateUpdateEventHelpChannel,
)
from modules.base.events.base_on_voice_state_update_event_scheduled_event_bonus import (
BaseOnVoiceStateUpdateEventScheduledEventBonus,
)
from modules.base.forms.bug_report_form import BugReportForm
from modules.base.forms.complaint_form import ComplaintForm
from modules.base.helper.base_reaction_handler import BaseReactionHandler
from modules.base.service.event_service import EventService
from modules.base.service.user_warnings_service import UserWarningsService
class BaseModule(ModuleABC):
def __init__(self, dc: DiscordCollectionABC):
ModuleABC.__init__(self, dc, FeatureFlagsEnum.base_module)
def configure_configuration(
self, config: ConfigurationABC, env: ApplicationEnvironmentABC
):
pass
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
services.add_transient(BaseReactionHandler)
services.add_singleton(EventService)
services.add_transient(UserWarningsService)
services.add_singleton(TaskABC, BirthdayWatcher)
# forms
services.add_transient(BugReportForm)
services.add_transient(ComplaintForm)
# commands
services.add_transient(AFKCommand)
services.add_transient(SubmitGroup)
services.add_transient(HelpCommand)
services.add_transient(InfoCommand)
services.add_transient(MassMoveCommand)
services.add_transient(PingCommand)
services.add_transient(PresenceCommand)
services.add_transient(MakeCoffeeCommand)
services.add_transient(PurgeCommand)
services.add_transient(UserGroup)
services.add_transient(RegisterGroup)
services.add_transient(UnregisterGroup)
services.add_transient(GameServerGroup)
# events
services.add_transient(
DiscordEventTypesEnum.on_command.value, BaseOnCommandEvent
)
services.add_transient(
DiscordEventTypesEnum.on_command_error.value, BaseOnCommandErrorEvent
)
services.add_transient(
DiscordEventTypesEnum.on_member_join.value, BaseOnMemberJoinEvent
)
services.add_transient(
DiscordEventTypesEnum.on_member_remove.value, BaseOnMemberRemoveEvent
)
services.add_transient(
DiscordEventTypesEnum.on_message.value, BaseOnMessageEvent
)
services.add_transient(
DiscordEventTypesEnum.on_message_delete.value, BaseOnMessageDeleteEvent
)
services.add_transient(
DiscordEventTypesEnum.on_raw_reaction_add.value, BaseOnRawReactionAddEvent
)
services.add_transient(
DiscordEventTypesEnum.on_raw_reaction_remove.value,
BaseOnRawReactionRemoveEvent,
)
services.add_transient(
DiscordEventTypesEnum.on_voice_state_update.value,
BaseOnVoiceStateUpdateEvent,
)
services.add_transient(
DiscordEventTypesEnum.on_voice_state_update.value,
BaseOnVoiceStateUpdateEventHelpChannel,
)
services.add_transient(
DiscordEventTypesEnum.on_voice_state_update.value,
BaseOnVoiceStateUpdateEventScheduledEventBonus,
)
services.add_transient(
DiscordEventTypesEnum.on_scheduled_event_update.value,
BaseOnScheduledEventUpdateEvent,
)
services.add_transient(
DiscordEventTypesEnum.on_guild_join.value,
BaseOnGuildJoinEvent,
)

View File

@@ -0,0 +1,71 @@
import datetime
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.ext import tasks
from bot_core.abc.task_abc import TaskABC
from bot_core.logging.task_logger import TaskLogger
from bot_core.service.message_service import MessageService
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server_config import ServerConfig
class BirthdayWatcher(TaskABC):
def __init__(
self,
config: ConfigurationABC,
logger: TaskLogger,
bot: DiscordBotServiceABC,
db: DatabaseContextABC,
users: UserRepositoryABC,
message_service: MessageService,
t: TranslatePipe,
):
TaskABC.__init__(self)
self._config = config
self._logger = logger
self._bot = bot
self._db = db
self._users = users
self._message_service = message_service
self._t = t
self.watch.start()
@tasks.loop(time=datetime.time(hour=8, minute=0))
async def watch(self):
self._logger.info(__name__, "Watching birthdays")
try:
today = datetime.date.today()
users = self._users.get_users().where(lambda x: x.birthday is not None)
for user in users:
if user.birthday.day != today.day or user.birthday.month != today.month:
continue
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{user.server.discord_id}"
)
user.xp += settings.xp_for_birthday
self._users.update_user(user)
self._db.save_changes()
guild = self._bot.get_guild(user.server.discord_id)
member = guild.get_member(user.discord_id)
await self._message_service.send_channel_message(
self._bot.get_channel(settings.notification_chat_id),
self._t.transform("modules.base.user.birthday.has_birthday").format(
member.mention
),
is_persistent=True,
)
except Exception as e:
self._logger.error(__name__, f"Watching birthdays failed", e)
@watch.before_loop
async def wait(self):
await self._wait_until_ready()

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base.command"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")

View File

@@ -0,0 +1,73 @@
from cpl_core.configuration import ConfigurationABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord import VoiceChannel
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_data.model.server_config import ServerConfig
class AFKCommand(DiscordCommandABC):
def __init__(
self,
logger: CommandLogger,
config: ConfigurationABC,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
translate: TranslatePipe,
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._config = config
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._t = translate
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
@commands.hybrid_command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def afk(self, ctx: Context):
self._logger.debug(__name__, f"Received command afk {ctx}")
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{ctx.guild.id}"
)
if ctx.author.voice is None or ctx.author.voice.channel is None:
await self._message_service.send_ctx_msg(
ctx,
self._t.transform("modules.base.afk_command_channel_missing_message"),
)
self._logger.trace(__name__, f"Finished afk command")
return
self._bot.loop.create_task(
self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.afk_command_move_message")
)
)
channel: VoiceChannel = ctx.guild.get_channel(settings.afk_command_channel_id)
try:
await ctx.author.move_to(channel)
self._client_utils.moved_user(ctx.guild.id)
except Exception as e:
self._logger.error(
__name__,
f"Cannot move user {ctx.author.id} to channel {ctx.channel.id}",
e,
)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.no_permission_message")
)
self._logger.trace(__name__, f"Finished afk command")

View File

@@ -0,0 +1,275 @@
from typing import List as TList
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord import app_commands
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
from bot_core.configuration.feature_flags_settings import FeatureFlagsSettings
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_data.abc.api_key_repository_abc import ApiKeyRepositoryABC
from bot_data.abc.game_server_repository_abc import GameServerRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_game_ident_repository_abc import UserGameIdentRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.game_server import GameServer
from bot_data.model.server_config import ServerConfig
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class GameServerGroup(DiscordCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
translate: TranslatePipe,
servers: ServerRepositoryABC,
user: UserRepositoryABC,
game_servers: GameServerRepositoryABC,
user_game_idents: UserGameIdentRepositoryABC,
api_keys: ApiKeyRepositoryABC,
db: DatabaseContextABC,
permission_service: PermissionServiceABC,
):
DiscordCommandABC.__init__(self)
self._config = config
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._t = translate
self._servers = servers
self._user = user
self._game_servers = game_servers
self._user_game_idents = user_game_idents
self._api_keys = api_keys
self._db = db
self._permissions = permission_service
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
@commands.hybrid_group(name="game-server")
@commands.guild_only()
async def game_server(self, ctx: Context):
pass
@game_server.command(alias="game-servers")
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def list(self, ctx: Context, wait: int = None):
self._logger.debug(__name__, f"Received command game_server list {ctx}")
if ctx.guild is None:
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{ctx.guild.id}"
)
if not FeatureFlagsSettings.get_flag_from_dict(
settings.feature_flags, FeatureFlagsEnum.game_server
):
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.feature_not_activated")
)
return
server = self._servers.get_server_by_discord_id(ctx.guild.id)
game_servers = self._game_servers.get_game_servers_by_server_id(server.id)
if game_servers.count() < 1:
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.game_server.error.nothing_found")
)
self._logger.trace(__name__, f"Finished command game_server list")
return
game_server_name = ""
api_key = ""
for game_server in game_servers:
game_server: GameServer = game_server
game_server_name += f"\n{game_server.name}"
api_key += f"\n{game_server.api_key.identifier}"
embed = discord.Embed(
title=self._t.transform("modules.base.game_server.list.title"),
description=self._t.transform("modules.base.game_server.list.description"),
color=int("ef9d0d", 16),
)
embed.add_field(
name=self._t.transform("modules.base.game_server.list.name"),
value=game_server_name,
inline=True,
)
embed.add_field(
name=self._t.transform("modules.base.game_server.list.api_key"),
value=api_key,
inline=True,
)
await self._message_service.send_ctx_msg(ctx, embed, wait_before_delete=wait)
self._logger.trace(__name__, f"Finished command game_server list")
@game_server.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_admin()
async def add(self, ctx: Context, name: str, api_key_id: int):
self._logger.debug(
__name__, f"Received command game-server add {ctx}: {name} {api_key_id}"
)
if ctx.guild is None:
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{ctx.guild.id}"
)
if not FeatureFlagsSettings.get_flag_from_dict(
settings.feature_flags, FeatureFlagsEnum.game_server
):
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.feature_not_activated")
)
return
server = self._servers.get_server_by_discord_id(ctx.guild.id)
api_key = self._api_keys.get_api_key_by_id(api_key_id)
game_server = GameServer(name, server, api_key)
self._game_servers.add_game_server(game_server)
self._db.save_changes()
await self._message_service.send_ctx_msg(
ctx,
self._t.transform("modules.base.game_server.add.success").format(name),
)
self._logger.trace(__name__, f"Finished command game-server add")
@add.autocomplete("api_key_id")
async def api_key_id_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
keys = self._api_keys.get_api_keys()
return [
app_commands.Choice(name=f"{key.identifier}: {key.key}", value=key.id)
for key in self._client_utils.get_auto_complete_list(
keys, current, lambda x: x.key
)
]
@game_server.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_admin()
async def remove(self, ctx: Context, id: int):
self._logger.debug(__name__, f"Received command game-server remove {ctx}: {id}")
if ctx.guild is None:
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{ctx.guild.id}"
)
if not FeatureFlagsSettings.get_flag_from_dict(
settings.feature_flags, FeatureFlagsEnum.game_server
):
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.feature_not_activated")
)
return
game_server = self._game_servers.get_game_server_by_id(id)
self._game_servers.delete_game_server(game_server)
self._db.save_changes()
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.game_server.remove.success")
)
self._logger.trace(__name__, f"Finished command game-server remove")
@remove.autocomplete("id")
async def id_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
server = self._servers.get_server_by_discord_id(interaction.guild.id)
game_servers = self._game_servers.get_game_servers_by_server_id(server.id)
return [
app_commands.Choice(name=gs.name, value=gs.id)
for gs in self._client_utils.get_auto_complete_list(
game_servers, current, lambda x: x.name
)
]
@game_server.command(name="list-members")
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def list_members(self, ctx: Context, id: int, wait: int = None):
self._logger.debug(__name__, f"Received command game-server remove {ctx}: {id}")
if ctx.guild is None:
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{ctx.guild.id}"
)
if not FeatureFlagsSettings.get_flag_from_dict(
settings.feature_flags, FeatureFlagsEnum.game_server
):
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.feature_not_activated")
)
return
game_server = self._game_servers.get_game_server_by_id(id)
game_idents = self._user_game_idents.get_user_game_idents_by_game_server_id(
game_server.id
)
users = ""
for game_ident in game_idents:
users += f"\n{game_ident.user.name}"
embed = discord.Embed(
title=self._t.transform("modules.base.game_server.list_members.title"),
description=self._t.transform(
"modules.base.game_server.list_members.description"
),
color=int("ef9d0d", 16),
)
embed.add_field(
name=self._t.transform("modules.base.game_server.list_members.users"),
value=users,
inline=True,
)
await self._message_service.send_ctx_msg(ctx, embed, wait_before_delete=wait)
self._logger.trace(__name__, f"Finished command game-server remove")
@list_members.autocomplete("id")
async def id_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
server = self._servers.get_server_by_discord_id(interaction.guild.id)
game_servers = self._game_servers.get_game_servers_by_server_id(server.id)
return [
app_commands.Choice(name=gs.name, value=gs.id)
for gs in self._client_utils.get_auto_complete_list(
game_servers, current, lambda x: x.name
)
]

View File

@@ -0,0 +1,57 @@
from typing import List
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from discord import app_commands
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_data.model.technician_config import TechnicianConfig
class HelpCommand(DiscordCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
):
DiscordCommandABC.__init__(self)
self._config = config
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
@commands.hybrid_command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def help(self, ctx: Context, persistent_flag: str = None):
self._logger.debug(__name__, f"Received command help {ctx}:{persistent_flag}")
settings: TechnicianConfig = self._config.get_configuration(TechnicianConfig)
is_persistent = persistent_flag == "--stay"
await self._message_service.send_ctx_msg(
ctx,
settings.help_command_reference_url,
is_persistent=is_persistent,
is_public=True,
)
self._logger.trace(__name__, f"Finished help command")
@help.autocomplete("persistent_flag")
async def help_autocomplete(
self, interaction: discord.Interaction, current: str
) -> List[app_commands.Choice[str]]:
flags = ["--stay"]
return [app_commands.Choice(name=key, value=key) for key in flags]

View File

@@ -0,0 +1,101 @@
from datetime import datetime
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.ext import commands
from discord.ext.commands import Context
import bot
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
class InfoCommand(DiscordCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
translate: TranslatePipe,
):
DiscordCommandABC.__init__(self)
self._config = config
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._t = translate
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
@commands.hybrid_command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def info(self, ctx: Context, *, wait: int = None):
self._logger.debug(__name__, f"Received command info {ctx},{wait}")
client = self._client_utils.get_client(self._bot.user.id, ctx.guild.id)
embed = discord.Embed(
title=self._t.transform("modules.base.info.title"),
description=self._t.transform("modules.base.info.description"),
color=int("ef9d0d", 16),
)
embed.add_field(
name=self._t.transform("modules.base.info.fields.version"),
value=bot.__version__,
)
start_time = self._config.get_configuration("Bot_StartTime")
ontime = round(
(
datetime.now() - datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
).total_seconds()
/ 3600,
2,
)
embed.add_field(
name=self._t.transform("modules.base.info.fields.ontime"),
value=f"{ontime}h",
)
embed.add_field(
name=self._t.transform("modules.base.info.fields.sent_message_count"),
value=client.sent_message_count,
inline=False,
)
embed.add_field(
name=self._t.transform("modules.base.info.fields.received_message_count"),
value=client.received_message_count,
)
embed.add_field(
name=self._t.transform("modules.base.info.fields.deleted_message_count"),
value=client.deleted_message_count,
inline=False,
)
embed.add_field(
name=self._t.transform("modules.base.info.fields.received_command_count"),
value=client.received_command_count,
)
embed.add_field(
name=self._t.transform("modules.base.info.fields.moved_users_count"),
value=client.moved_users_count,
)
from bot.module_list import ModuleList
modules = ModuleList.get_modules()
modules = modules.select(lambda x: x.__name__.replace("Module", ""))
embed.add_field(
name=self._t.transform("modules.base.info.fields.modules"),
value="\n".join(modules),
inline=False,
)
await self._message_service.send_ctx_msg(ctx, embed, wait_before_delete=wait)
self._logger.trace(__name__, f"Finished info command")

View File

@@ -0,0 +1,17 @@
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from discord.ext import commands
from discord.ext.commands import Context
class MakeCoffeeCommand(DiscordCommandABC):
def __init__(self, logger: LoggerABC, bot: DiscordBotServiceABC):
DiscordCommandABC.__init__(self)
self._logger = logger
self._bot = bot
@commands.hybrid_command(name="make-coffee")
async def make_coffee(self, ctx: Context):
await ctx.send("https://media.giphy.com/media/M4ecx9P2jI4tq/giphy.gif")

View File

@@ -0,0 +1,66 @@
import asyncio
import discord
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.app_commands import Transform
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_core.service.client_utils_service import ClientUtilsService
from modules.base.helper.voice_channel_transformer import VoiceChannelTransformer
class MassMoveCommand(DiscordCommandABC):
def __init__(
self,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
translate: TranslatePipe,
client_utils: ClientUtilsService,
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._message_service = message_service
self._bot = bot
self._t = translate
self._client_utils = client_utils
@commands.hybrid_command(name="mass-move")
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def mass_move(
self,
ctx: Context,
channel_to: discord.VoiceChannel,
channel_from: Transform[str, VoiceChannelTransformer] = None,
):
self._logger.debug(__name__, f"Received command mass-move {ctx}")
if channel_from is None and ctx.author.voice is None:
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.mass_move.channel_from_error")
)
return
if channel_from is None:
channel_from = ctx.author.voice.channel
moves = [member.move_to(channel_to) for member in channel_from.members]
move_count = len(moves)
await asyncio.gather(*moves)
self._client_utils.moved_users(ctx.guild.id, move_count)
await self._message_service.send_ctx_msg(
ctx,
self._t.transform("modules.base.mass_move.moved").format(
channel_from.mention, channel_to.mention
),
)
self._logger.trace(__name__, f"Finished mass-move command")

View File

@@ -0,0 +1,69 @@
import discord
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.model.technician_config import TechnicianConfig
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class PingCommand(DiscordCommandABC):
def __init__(
self,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
translate: TranslatePipe,
permissions: PermissionServiceABC,
servers: ServerRepositoryABC,
settings: TechnicianConfig,
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._t = translate
self._permissions = permissions
self._servers = servers
self._settings = settings
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
@staticmethod
def _get_ping(url: str) -> float:
from icmplib import ping
ping_result = ping(url, count=4, interval=0.2, privileged=False)
return ping_result.avg_rtt
@commands.hybrid_command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def ping(self, ctx: Context):
self._logger.debug(__name__, f"Received command ping {ctx}")
if self._permissions.is_member_technician(ctx.author):
embed = discord.Embed(
title=self._t.transform("modules.base.info.title"),
description=self._t.transform("modules.base.info.description"),
color=int("ef9d0d", 16),
)
for server in self._settings.ping_urls:
embed.add_field(
name=server, value=f"{self._get_ping(server)} ms", inline=False
)
await self._message_service.send_ctx_msg(ctx, embed)
else:
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.pong")
)
self._logger.trace(__name__, f"Finished ping command")

View File

@@ -0,0 +1,53 @@
import discord
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
class PresenceCommand(DiscordCommandABC):
def __init__(
self,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
translate: TranslatePipe,
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._message_service = message_service
self._bot = bot
self._t = translate
@commands.hybrid_command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def presence(self, ctx: Context, text: str = ""):
self._logger.debug(__name__, f"Received command presence {ctx}")
if text == "":
await self._bot.change_presence(activity=None)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.presence.removed")
)
return
if len(text) > 128:
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.presence.max_char_count_exceeded")
)
return
await self._bot.change_presence(activity=discord.Game(name=text))
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.presence.changed")
)
self._logger.trace(__name__, f"Finished presence command")

View File

@@ -0,0 +1,60 @@
import asyncio
from cpl_core.configuration import ConfigurationABC
from cpl_discord.command import DiscordCommandABC
from cpl_translation import TranslatePipe
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_data.model.server_config import ServerConfig
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class PurgeCommand(DiscordCommandABC):
def __init__(
self,
logger: CommandLogger,
config: ConfigurationABC,
message_service: MessageServiceABC,
permissions: PermissionServiceABC,
client_utils: ClientUtilsABC,
translate: TranslatePipe,
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._config = config
self._message_service = message_service
self._permissions = permissions
self._client_utils = client_utils
self._t = translate
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
@commands.hybrid_command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def purge(self, ctx: Context):
self._logger.debug(__name__, f"Received command purge {ctx}")
server_settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{ctx.guild.id}"
)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.moderator.purge_message")
)
await asyncio.sleep(server_settings.message_delete_timer)
try:
await ctx.channel.purge()
except Exception as e:
self._logger.error(__name__, f"Cannot purge channel {ctx.channel.id}", e)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.bot_has_no_permission_message")
)
self._logger.trace(__name__, f"Finished purge command")

View File

@@ -0,0 +1,116 @@
from typing import List as TList
import discord
import requests
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord import app_commands
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_data.abc.game_server_repository_abc import GameServerRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_game_ident_repository_abc import UserGameIdentRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.user_game_ident import UserGameIdent
class RegisterGroup(DiscordCommandABC):
def __init__(
self,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
servers: ServerRepositoryABC,
users: UserRepositoryABC,
game_server: GameServerRepositoryABC,
user_game_ident: UserGameIdentRepositoryABC,
db: DatabaseContextABC,
t: TranslatePipe,
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._servers = servers
self._users = users
self._game_server = game_server
self._user_game_ident = user_game_ident
self._db = db
self._t = t
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
@commands.hybrid_group()
@commands.guild_only()
async def register(self, ctx: Context):
pass
@register.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def minecraft(
self, ctx: Context, member: discord.Member, game_server: int, name: str
):
self._logger.debug(__name__, f"Received command register minecraft {ctx}")
minecraft_id = None
try:
self._logger.debug(__name__, f"Try to get minecraft id for {name}")
response = requests.get(
url=f"https://api.mojang.com/users/profiles/minecraft/{name}"
)
if len(response.content) == 0:
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform("modules.base.register.not_found"),
)
return
minecraft_id = response.json()["id"]
except Exception as e:
self._logger.error(__name__, f"Get minecraft id for {name} failed", e)
await self._message_service.send_interaction_msg(
ctx.interaction, self._t.transform("modules.base.register.not_found")
)
if minecraft_id is None:
return
server = self._servers.get_server_by_discord_id(ctx.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id)
gs = self._game_server.get_game_server_by_id(game_server)
game_ident = UserGameIdent(user, gs, minecraft_id)
self._user_game_ident.add_user_game_ident(game_ident)
self._db.save_changes()
await self._message_service.send_interaction_msg(
ctx.interaction, self._t.transform("modules.base.register.success")
)
self._logger.trace(__name__, f"Finished register minecraft command")
@minecraft.autocomplete("game_server")
async def game_server_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
server = self._servers.get_server_by_discord_id(interaction.guild.id)
game_servers = self._game_server.get_game_servers_by_server_id(server.id)
return [
app_commands.Choice(name=gs.name, value=gs.id)
for gs in self._client_utils.get_auto_complete_list(
game_servers, current, lambda x: x.name
)
]

View File

@@ -0,0 +1,44 @@
from cpl_core.dependency_injection import ServiceProviderABC
from cpl_core.logging import LoggerABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.helper.command_checks import CommandChecks
from modules.base.forms.bug_report_form import BugReportForm
from modules.base.forms.complaint_form import ComplaintForm
class SubmitGroup(DiscordCommandABC):
def __init__(
self, services: ServiceProviderABC, logger: LoggerABC, bot: DiscordBotServiceABC
):
DiscordCommandABC.__init__(self)
self._services = services
self._logger = logger
self._bot = bot
@commands.hybrid_group()
@commands.guild_only()
async def submit(self, ctx: Context):
pass
@submit.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def complaint(self, ctx: Context):
self._logger.debug(__name__, f"Received command complaint {ctx}")
complaint_form: ComplaintForm = self._services.get_service(ComplaintForm)
await ctx.interaction.response.send_modal(complaint_form)
self._logger.trace(__name__, f"Finished command complaint {ctx}")
@submit.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def bug_report(self, ctx: Context):
self._logger.debug(__name__, f"Received command complaint {ctx}")
bug_form: BugReportForm = self._services.get_service(BugReportForm)
await ctx.interaction.response.send_modal(bug_form)
self._logger.trace(__name__, f"Finished command complaint {ctx}")

View File

@@ -0,0 +1,90 @@
from typing import List as TList
import discord
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord import app_commands
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_data.abc.game_server_repository_abc import GameServerRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_game_ident_repository_abc import UserGameIdentRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
class UnregisterGroup(DiscordCommandABC):
def __init__(
self,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
servers: ServerRepositoryABC,
users: UserRepositoryABC,
game_server: GameServerRepositoryABC,
user_game_idents: UserGameIdentRepositoryABC,
db: DatabaseContextABC,
t: TranslatePipe,
):
DiscordCommandABC.__init__(self)
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._servers = servers
self._users = users
self._game_server = game_server
self._user_game_idents = user_game_idents
self._db = db
self._t = t
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
# @commands.hybrid_group()
# @commands.guild_only()
# async def unregister(self, ctx: Context):
# pass
@commands.hybrid_command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def unregister(self, ctx: Context, member: discord.Member, game_server: int):
self._logger.debug(__name__, f"Received command unregister {ctx}")
server = self._servers.get_server_by_discord_id(ctx.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id)
ident = user.game_idents.where(
lambda x: x.game_server.id == game_server
).single()
self._user_game_idents.delete_user_game_ident(ident)
self._users.update_user(user)
self._db.save_changes()
await self._message_service.send_interaction_msg(
ctx.interaction, self._t.transform("modules.base.unregister.success")
)
self._logger.trace(__name__, f"Finished unregister command")
@unregister.autocomplete("game_server")
async def game_server_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
server = self._servers.get_server_by_discord_id(interaction.guild.id)
game_servers = self._game_server.get_game_servers_by_server_id(server.id)
return [
app_commands.Choice(name=gs.name, value=gs.id)
for gs in self._client_utils.get_auto_complete_list(
game_servers, current, lambda x: x.name
)
]

View File

@@ -0,0 +1,563 @@
import datetime
from typing import Optional, List as TList
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.command import DiscordCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_query.extension import List
from cpl_translation import TranslatePipe
from discord import app_commands
from discord.ext import commands
from discord.ext.commands import Context
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.command_checks import CommandChecks
from bot_core.logging.command_logger import CommandLogger
from bot_core.pipes.date_time_offset_pipe import DateTimeOffsetPipe
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_joined_voice_channel_repository_abc import (
UserJoinedVoiceChannelRepositoryABC,
)
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.abc.user_warnings_repository_abc import UserWarningsRepositoryABC
from bot_data.model.server_config import ServerConfig
from modules.base.service.user_warnings_service import UserWarningsService
from modules.level.service.level_service import LevelService
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class UserGroup(DiscordCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: CommandLogger,
message_service: MessageServiceABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
permissions: PermissionServiceABC,
servers: ServerRepositoryABC,
db: DatabaseContextABC,
users: UserRepositoryABC,
user_joined_servers: UserJoinedServerRepositoryABC,
user_joined_voice_channel: UserJoinedVoiceChannelRepositoryABC,
translate: TranslatePipe,
date: DateTimeOffsetPipe,
level: LevelService,
user_warnings: UserWarningsRepositoryABC,
user_warnings_service: UserWarningsService,
):
DiscordCommandABC.__init__(self)
self._config = config
self._logger = logger
self._message_service = message_service
self._bot = bot
self._client_utils = client_utils
self._permissions = permissions
self._servers = servers
self._db = db
self._users = users
self._user_joined_servers = user_joined_servers
self._user_joined_voice_channel = user_joined_voice_channel
self._t = translate
self._date = date
self._level = level
self._user_warnings = user_warnings
self._user_warnings_service = user_warnings_service
self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}")
self._atr_dict = {
"xp": self._t.transform("modules.base.user.atr.xp"),
"ontime": self._t.transform("modules.base.user.atr.ontime"),
}
self._atr_TList = [(key, self._atr_dict[key]) for key in self._atr_dict]
async def _handle_atr_calc(
self,
ctx: Context,
atr: str,
value: int,
member: discord.Member,
is_remove=False,
):
if member is None or not isinstance(member, discord.Member):
member = ctx.author
server = self._servers.find_server_by_discord_id(ctx.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if atr == "xp":
if is_remove:
user.xp -= value
else:
user.xp += value
self._users.update_user(user)
self._db.save_changes()
await self._level.check_level(member)
else:
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform("modules.base.user.error.atr_not_found").format(atr),
)
return
type = "add" if not is_remove else "remove"
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform(f"modules.base.user.{type}.{atr.lower()}").format(
atr, value, member.mention
),
)
@commands.hybrid_group()
@commands.guild_only()
async def user(self, ctx: Context):
pass
@user.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def birthday(self, ctx: Context, day: int, month: int, year: int):
self._logger.debug(
__name__,
f"Received command user birthday {ctx}:{ctx.author},{day}:{month}:{year}",
)
date = datetime.date(year, month, day)
server = self._servers.get_server_by_discord_id(ctx.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(
ctx.author.id, server.id
)
user.birthday = date
self._users.update_user(user)
self._db.save_changes()
await self._message_service.send_interaction_msg(
ctx.interaction, self._t.transform("modules.base.user.birthday.success")
)
# notify team to prevent multiple entries every day
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{ctx.guild.id}"
)
channel = ctx.guild.get_channel(settings.team_channel_id)
await self._message_service.send_channel_message(
channel,
self._t.transform("modules.base.user.birthday.success_team").format(
ctx.author.mention, date
),
is_persistent=True,
)
self._logger.trace(__name__, f"Finished user-info command")
@birthday.autocomplete("day")
async def birthday_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[int]]:
days = List(int, range(0, 32)).select(lambda x: str(x))
return [
app_commands.Choice(name=date, value=date)
for date in self._client_utils.get_auto_complete_list(days, current)
]
@birthday.autocomplete("month")
async def birthday_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[int]]:
days = List(int, range(0, 13)).select(lambda x: str(x))
return [
app_commands.Choice(name=date, value=date)
for date in self._client_utils.get_auto_complete_list(days, current)
]
@birthday.autocomplete("year")
async def birthday_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[int]]:
today = datetime.date.today()
days = List(int, range(today.year - 75, today.year)).select(lambda x: str(x))
return [
app_commands.Choice(name=date, value=date)
for date in self._client_utils.get_auto_complete_list(days, current)
]
@user.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def info(
self, ctx: Context, member: Optional[discord.Member] = None, *, wait: int = None
):
self._logger.debug(
__name__, f"Received command user-info {ctx}:{member},{wait}"
)
is_mod = self._permissions.is_member_moderator(ctx.author)
if member is not None and not is_mod:
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.no_permission_message")
)
return
if member is None or not isinstance(member, discord.Member):
member = ctx.author
server = self._servers.find_server_by_discord_id(ctx.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
joins = self._user_joined_servers.get_user_joined_servers_by_user_id(user.id)
embed = discord.Embed(
title=member.name, description=member.name, color=int("ef9d0d", 16)
)
embed.add_field(
name=self._t.transform("modules.base.user.atr.id"), value=member.id
)
embed.add_field(
name=self._t.transform("modules.base.user.atr.name"), value=member.name
)
embed.add_field(
name=self._t.transform("modules.base.user.atr.discord_join"),
value=self._date.transform(member.created_at),
inline=False,
)
embed.add_field(
name=self._t.transform("modules.base.user.atr.last_join"),
value=self._date.transform(member.joined_at),
inline=False,
)
embed.add_field(
name=self._t.transform("modules.base.user.atr.xp"), value=str(user.xp)
)
embed.add_field(
name=self._t.transform("modules.base.user.atr.ontime"),
value=str(self._client_utils.get_ontime_for_user(user)),
)
roles = ""
for role in member.roles:
roles += f"{role.name}\n"
embed.add_field(
name=self._t.transform("modules.base.user.atr.roles"),
value=roles,
inline=False,
)
if is_mod or member == ctx.author:
joins_string = ""
for join in joins:
joins_string += f"{self._date.transform(join.joined_on)}\n"
embed.add_field(
name=self._t.transform("modules.base.user.atr.joins"),
value=joins_string,
)
if is_mod or member == ctx.author:
lefts_string = ""
for join in joins:
if join.leaved_on is None:
if lefts_string == "":
lefts_string = "/"
continue
lefts_string += f"{self._date.transform(join.leaved_on)}\n"
embed.add_field(
name=self._t.transform("modules.base.user.atr.lefts"),
value=lefts_string,
)
if is_mod or member == ctx.author:
warnings_string = ""
for warning in self._user_warnings.get_user_warnings_by_user_id(user.id):
warnings_string += f"{warning.id} - {warning.description}\n"
embed.add_field(
name=self._t.transform("modules.base.user.atr.warnings"),
value=warnings_string,
inline=False,
)
# send to interaction because of sensitive data
await self._message_service.send_interaction_msg(
ctx.interaction, embed, wait_before_delete=wait
)
self._logger.trace(__name__, f"Finished user-info command")
@user.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
async def get(self, ctx: Context, atr: str, member: discord.Member = None):
self._logger.debug(__name__, f"Received command user-get {atr} {ctx}:{member}")
is_mod = self._permissions.is_member_moderator(ctx.author)
if member is not None and not is_mod:
await self._message_service.send_ctx_msg(
ctx, self._t.transform("common.no_permission_message")
)
return
if member is None or not isinstance(member, discord.Member):
member = ctx.author
server = self._servers.find_server_by_discord_id(ctx.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if atr == "xp":
value = str(user.xp)
elif atr == "ontime":
value = str(self._client_utils.get_ontime_for_user(user))
else:
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform("modules.base.user.error.atr_not_found").format(atr),
)
return
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform(f"modules.base.user.get.{atr.lower()}").format(
member.mention, value
),
)
@get.autocomplete("atr")
async def get_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
return [
app_commands.Choice(name=value, value=key) for key, value in self._atr_TList
]
@user.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def set(
self, ctx: Context, atr: str, value: int, member: discord.Member = None
):
self._logger.debug(__name__, f"Received command user-set {atr} {ctx}:{member}")
if member is None or not isinstance(member, discord.Member):
member = ctx.author
server = self._servers.find_server_by_discord_id(ctx.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if atr == "xp":
try:
user.xp = value
except TypeError as te:
self._logger.error(
__name__, f"String value couldn't be converted to int", te
)
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform("modules.base.user.set.error.type_error"),
)
return
else:
self._users.update_user(user)
self._db.save_changes()
await self._level.check_level(member)
else:
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform("modules.base.user.error.atr_not_found").format(atr),
)
return
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform(f"modules.base.user.set.{atr.lower()}").format(
member.mention, value
),
)
@set.autocomplete("atr")
async def set_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
atr_TList = [("xp", self._atr_dict["xp"])]
return [app_commands.Choice(name=value, value=key) for key, value in atr_TList]
@user.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def add(
self, ctx: Context, atr: str, value: int, member: discord.Member = None
):
self._logger.debug(
__name__, f"Received command user-add {atr}-={value} {ctx}:{member}"
)
await self._handle_atr_calc(ctx, atr, value, member)
@add.autocomplete("atr")
async def set_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
atr_TList = [("xp", self._atr_dict["xp"])]
return [app_commands.Choice(name=value, value=key) for key, value in atr_TList]
@user.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def remove(
self, ctx: Context, atr: str, value: int, member: discord.Member = None
):
self._logger.debug(
__name__, f"Received command user-remove {atr}-={value} {ctx}:{member}"
)
await self._handle_atr_calc(ctx, atr, value, member, is_remove=True)
@remove.autocomplete("atr")
async def set_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
atr_TList = [("xp", self._atr_dict["xp"])]
return [app_commands.Choice(name=value, value=key) for key, value in atr_TList]
@user.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def reset(self, ctx: Context, atr: str, member: discord.Member = None):
self._logger.debug(
__name__, f"Received command user-reset {atr} {ctx}:{member}"
)
if member is None or not isinstance(member, discord.Member):
member = ctx.author
server = self._servers.find_server_by_discord_id(ctx.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
if atr == "xp":
user.xp = 0
self._users.update_user(user)
self._db.save_changes()
await self._level.check_level(member)
elif atr == "ontime":
self._user_joined_voice_channel.delete_user_joined_voice_channel_by_user_id(
user.id
)
self._db.save_changes()
else:
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform("modules.base.user.error.atr_not_found").format(atr),
)
return
await self._message_service.send_interaction_msg(
ctx.interaction,
self._t.transform(f"modules.base.user.reset.{atr.lower()}").format(
atr, member.mention
),
)
@reset.autocomplete("atr")
async def reset_autocomplete(
self, interaction: discord.Interaction, current: str
) -> TList[app_commands.Choice[str]]:
return [
app_commands.Choice(name=value, value=key) for key, value in self._atr_TList
]
@user.group()
@commands.guild_only()
async def warning(self, ctx: Context):
pass
@warning.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def show(self, ctx: Context, member: discord.Member, wait: int = None):
self._logger.debug(
__name__, f"Received command user warning show {ctx}:{member}"
)
server = self._servers.find_server_by_discord_id(ctx.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id)
embed = discord.Embed(
title=member.name,
description=self._t.transform("modules.base.user.atr.warnings"),
color=int("ef9d0d", 16),
)
warnings = self._user_warnings.get_user_warnings_by_user_id(user.id)
warnings_id_string = ""
for warning in warnings:
warnings_id_string += f"{warning.id}\n"
warnings_description_string = ""
for warning in warnings:
warnings_description_string += f"{warning.description}\n"
embed.add_field(
name=self._t.transform("modules.base.warnings.show.id"),
value=warnings_id_string,
inline=True,
)
embed.add_field(
name=self._t.transform("modules.base.warnings.show.description"),
value=warnings_description_string,
inline=True,
)
await self._message_service.send_interaction_msg(
ctx.interaction, embed, wait_before_delete=wait
)
self._logger.trace(__name__, f"Finished user warning show command")
@warning.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def add(self, ctx: Context, member: discord.Member, description: str):
self._logger.debug(
__name__, f"Received command user warning add {ctx}:{member},{description}"
)
try:
await self._user_warnings_service.add_warnings_async(
member, description, ctx.author.id
)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.warnings.add.success")
)
except Exception as e:
self._logger.error(__name__, f"Adding user warning failed", e)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.warnings.add.failed")
)
self._logger.trace(__name__, f"Finished user warning add command")
@warning.command()
@commands.guild_only()
@CommandChecks.check_is_ready()
@CommandChecks.check_is_member_moderator()
async def remove(self, ctx: Context, warning_id: int):
self._logger.debug(
__name__, f"Received command user warning remove {ctx}:{warning_id}"
)
try:
await self._user_warnings_service.remove_warnings_async(warning_id)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.warnings.remove.success")
)
except Exception as e:
self._logger.error(__name__, f"Removing user warning failed", e)
await self._message_service.send_ctx_msg(
ctx, self._t.transform("modules.base.warnings.remove.failed")
)
self._logger.trace(__name__, f"Finished user warning remove command")

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base.events"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")

View File

@@ -0,0 +1,279 @@
import datetime
import uuid
from cpl_core.logging import LoggerABC
from cpl_core.time import TimeFormatSettings
from cpl_discord.events.on_command_error_abc import OnCommandErrorABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.ext import commands
from discord.ext.commands import Context, CommandError
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.exception.check_error import CheckError
from bot_data.model.technician_config import TechnicianConfig
class BaseOnCommandErrorEvent(OnCommandErrorABC):
def __init__(
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
messenger: MessageServiceABC,
tech_settings: TechnicianConfig,
time_format_settings: TimeFormatSettings,
translate: TranslatePipe,
):
OnCommandErrorABC.__init__(self)
self._logger = logger
self._bot = bot
self._messenger = messenger
self._tech_settings = tech_settings
self._time_format_settings = time_format_settings
self._t = translate
async def on_command_error(self, ctx: Context, error: CommandError):
if isinstance(error, CheckError):
return
error = getattr(error, "original", error)
uid = uuid.uuid4()
self._logger.error(__name__, f"Got error: {type(error).__name__} UID: {uid}")
if isinstance(error, commands.MissingRequiredArgument):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.missing_required_argument"),
without_tracking=True,
)
elif isinstance(error, commands.ArgumentParsingError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.argument_parsing_error"),
without_tracking=True,
)
elif isinstance(error, commands.UnexpectedQuoteError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.unexpected_quote_error"),
without_tracking=True,
)
elif isinstance(error, commands.InvalidEndOfQuotedStringError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.invalid_end_of_quoted_string_error"),
without_tracking=True,
)
elif isinstance(error, commands.ExpectedClosingQuoteError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.expected_closing_quote_error"),
without_tracking=True,
)
elif isinstance(error, commands.BadArgument):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.bad_argument"),
without_tracking=True,
)
elif isinstance(error, commands.BadUnionArgument):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.bad_union_argument"),
without_tracking=True,
)
elif isinstance(error, commands.PrivateMessageOnly):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.private_message_only"),
without_tracking=True,
)
elif isinstance(error, commands.NoPrivateMessage):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.no_private_message"),
without_tracking=True,
)
elif isinstance(error, commands.CheckFailure):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.check_failure"),
without_tracking=True,
)
elif isinstance(error, commands.CheckAnyFailure):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.check_any_failure"),
without_tracking=True,
)
elif isinstance(error, commands.CommandNotFound):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.command_not_found"),
without_tracking=True,
)
elif isinstance(error, commands.DisabledCommand):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.disabled_command"),
without_tracking=True,
)
elif isinstance(error, commands.CommandInvokeError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.command_invoke_error"),
without_tracking=True,
)
elif isinstance(error, commands.TooManyArguments):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.too_many_arguments"),
without_tracking=True,
)
elif isinstance(error, commands.UserInputError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.user_input_error"),
without_tracking=True,
)
elif isinstance(error, commands.CommandOnCooldown):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.command_on_cooldown"),
without_tracking=True,
)
elif isinstance(error, commands.MaxConcurrencyReached):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.max_concurrency_reached"),
without_tracking=True,
)
elif isinstance(error, commands.NotOwner):
await self._messenger.send_ctx_msg(
ctx, self._t.transform("common.errors.not_owner"), without_tracking=True
)
elif isinstance(error, commands.MissingPermissions):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.missing_permissions"),
without_tracking=True,
)
elif isinstance(error, commands.BotMissingPermissions):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.bot_missing_permissions"),
without_tracking=True,
)
elif isinstance(error, commands.MissingRole):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.missing_role"),
without_tracking=True,
)
elif isinstance(error, commands.BotMissingRole):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.bot_missing_role"),
without_tracking=True,
)
elif isinstance(error, commands.MissingAnyRole):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.missing_any_role"),
without_tracking=True,
)
elif isinstance(error, commands.BotMissingAnyRole):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.bot_missing_any_role"),
without_tracking=True,
)
elif isinstance(error, commands.NSFWChannelRequired):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.nsfw_channel_required"),
without_tracking=True,
)
elif isinstance(error, commands.ExtensionError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.extension_error"),
without_tracking=True,
)
elif isinstance(error, commands.ExtensionAlreadyLoaded):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.extension_already_loaded"),
without_tracking=True,
)
elif isinstance(error, commands.ExtensionNotLoaded):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.extension_not_loaded"),
without_tracking=True,
)
elif isinstance(error, commands.NoEntryPointError):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.no_entry_point_error"),
without_tracking=True,
)
elif isinstance(error, commands.ExtensionFailed):
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.extension_failed"),
without_tracking=True,
)
else:
await self._messenger.send_ctx_msg(
ctx,
self._t.transform("common.errors.command_error"),
without_tracking=True,
)
message = self._t.transform(
"modules.base.technician_command_error_message"
).format(
ctx.command,
ctx.author,
error,
datetime.datetime.now().strftime(
self._time_format_settings.date_time_format
),
uid,
)
for t in self._tech_settings.technician_ids:
member = self._bot.get_user(t)
await self._messenger.send_dm_message(
message, member, without_tracking=True
)

View File

@@ -0,0 +1,96 @@
from typing import Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.time import TimeFormatSettings
from cpl_discord.events import OnCommandABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord.ext.commands import Context
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.logging.command_logger import CommandLogger
from bot_data.abc.client_repository_abc import ClientRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server_config import ServerConfig
from bot_data.model.user import User
class BaseOnCommandEvent(OnCommandABC):
def __init__(
self,
config: ConfigurationABC,
logger: CommandLogger,
bot: DiscordBotServiceABC,
messenger: MessageServiceABC,
time_format_settings: TimeFormatSettings,
translate: TranslatePipe,
db: DatabaseContextABC,
users: UserRepositoryABC,
clients: ClientRepositoryABC,
servers: ServerRepositoryABC,
):
OnCommandABC.__init__(self)
self._config = config
self._logger = logger
self._bot = bot
self._messenger = messenger
self._time_format_settings = time_format_settings
self._t = translate
self._db = db
self._users = users
self._clients = clients
self._servers = servers
def _append_received_command_count(self, g_id: int):
try:
self._clients.append_received_command_count(self._bot.user.id, g_id, 1)
self._db.save_changes()
except Exception as e:
self._logger.error(
__name__, f"Cannot edit client {self._bot.user.id}@{g_id}", e
)
def _handle_message_for_xp(self, message: discord.Message):
dc_user_id = message.author.id
try:
server = self._servers.get_server_by_discord_id(message.guild.id)
except Exception as e:
self._logger.error(__name__, f"Cannot get server {message.guild.id}", e)
return
user: Optional[User] = None
try:
user = self._users.get_user_by_discord_id_and_server_id(
dc_user_id, server.id
)
except Exception as e:
self._logger.error(__name__, f"Cannot get user {dc_user_id}", e)
return
if user is None:
self._logger.error(__name__, f"User not found {dc_user_id}")
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{message.guild.id}"
)
old_xp = user.xp
user.xp += settings.xp_per_message
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(
__name__, f"User {user} sent message. xp: from {old_xp} to {user.xp}"
)
async def on_command(self, ctx: Context):
self._logger.debug(__name__, f"Module {type(self)} started")
self._logger.info(
__name__, f"Received command: {ctx.command} from {ctx.channel}"
)
if ctx is None or ctx.guild is None:
return
self._append_received_command_count(ctx.guild.id)

View File

@@ -0,0 +1,38 @@
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnGuildJoinABC
from cpl_discord.service import DiscordBotServiceABC
from discord import Guild
from bot_data.abc.server_config_repository_abc import ServerConfigRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.model.server import Server
from bot_data.service.seeder_service import SeederService
class BaseOnGuildJoinEvent(OnGuildJoinABC):
def __init__(
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
servers: ServerRepositoryABC,
server_config: ServerConfigRepositoryABC,
db: DatabaseContextABC,
seeder: SeederService,
):
OnGuildJoinABC.__init__(self)
self._logger = logger
self._bot = bot
self._servers = servers
self._server_config = server_config
self._db = db
self._seeder = seeder
async def on_guild_join(self, guild: Guild):
if self._servers.find_server_by_discord_id(guild.id) is None:
self._servers.add_server(Server(guild.id))
self._db.save_changes()
await self._seeder.seed()
self._logger.debug(__name__, "Seeded technician config")

View File

@@ -0,0 +1,124 @@
from datetime import datetime
from typing import Union
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnMemberJoinABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.event_checks import EventChecks
from bot_data.abc.known_user_repository_abc import KnownUserRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.known_user import KnownUser
from bot_data.model.server_config import ServerConfig
from bot_data.model.user import User
from bot_data.model.user_joined_server import UserJoinedServer
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class BaseOnMemberJoinEvent(OnMemberJoinABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
messenger: MessageServiceABC,
permissions: PermissionServiceABC,
db: DatabaseContextABC,
bot: DiscordBotServiceABC,
known_users: KnownUserRepositoryABC,
users: UserRepositoryABC,
servers: ServerRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
client_utils: ClientUtilsABC,
translate: TranslatePipe,
):
OnMemberJoinABC.__init__(self)
self._config = config
self._logger = logger
self._messenger = messenger
self._permission_service = permissions
self._db = db
self._bot = bot
self._known_users = known_users
self._users = users
self._servers = servers
self._user_joins = user_joins
self._client_utils = client_utils
self._t = translate
def _check_for_known_user(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f"Check if user is already known {member}")
try:
user = self._known_users.find_user_by_discord_id(member.id)
if user is not None:
return
self._logger.debug(__name__, f"Add user: {member.id}")
self._known_users.add_user(KnownUser(member.id))
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot get user {member.id}", e)
async def _add_if_not_exists_user_async(
self, member: Union[discord.User, discord.Member]
):
self._logger.debug(__name__, f"Check if user exists {member}")
await self._messenger.send_dm_message(
self._t.transform("modules.base.welcome_message").format(member.guild.name),
member,
)
try:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(
member.id, server.id
)
if user is not None:
self._user_joins.add_user_joined_server(
UserJoinedServer(user, datetime.now())
)
self._db.save_changes()
return
self._logger.debug(__name__, f"Add user: {member.id}")
self._users.add_user(User(member.id, 0, 0, 0, server))
self._db.save_changes()
user = self._users.get_user_by_discord_id_and_server_id(
member.id, server.id
)
self._user_joins.add_user_joined_server(
UserJoinedServer(user, datetime.now())
)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot get user {member.id}", e)
async def _notify_team(self, member: discord.Member):
self._logger.debug(__name__, f"Notify team that a member left")
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{member.guild.id}"
)
channel = member.guild.get_channel(settings.team_channel_id)
await self._messenger.send_channel_message(
channel,
self._t.transform("modules.base.welcome_message_for_team").format(
member.mention
),
is_persistent=True,
)
self._logger.trace(__name__, f"Notified team that a member left")
@EventChecks.check_is_ready()
async def on_member_join(self, member: discord.Member):
self._logger.debug(__name__, f"Module {type(self)} started")
await self._client_utils.check_default_role(member)
self._check_for_known_user(member)
await self._add_if_not_exists_user_async(member)
await self._notify_team(member)

View File

@@ -0,0 +1,84 @@
from datetime import datetime
from typing import Union
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnMemberRemoveABC
from cpl_translation import TranslatePipe
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.event_checks import EventChecks
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server_config import ServerConfig
class BaseOnMemberRemoveEvent(OnMemberRemoveABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
db: DatabaseContextABC,
message_service: MessageServiceABC,
users: UserRepositoryABC,
servers: ServerRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
translate: TranslatePipe,
):
OnMemberRemoveABC.__init__(self)
self._config = config
self._logger = logger
self._db = db
self._message_service = message_service
self._users = users
self._servers = servers
self._user_joins = user_joins
self._t = translate
async def _remove_user(self, member: Union[discord.User, discord.Member]):
self._logger.debug(__name__, f"Remove user {member}")
await self._message_service.send_dm_message(
self._t.transform("modules.base.goodbye_message"), member
)
try:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.find_user_by_discord_id_and_server_id(
member.id, server.id
)
if user is None:
self._logger.error(__name__, f"Cannot find user {member}")
return
join = self._user_joins.get_active_user_joined_server_by_user_id(user.id)
join.leaved_on = datetime.now()
self._user_joins.update_user_joined_server(join)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f"Cannot remove user {member.id}", e)
async def _notify_team(self, member: discord.Member):
self._logger.debug(__name__, f"Notify team that a member left")
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{member.guild.id}"
)
channel = member.guild.get_channel(settings.team_channel_id)
await self._message_service.send_channel_message(
channel,
self._t.transform("modules.base.member_left_message").format(
member.mention
),
is_persistent=True,
)
self._logger.trace(__name__, f"Notified team that a member left")
@EventChecks.check_is_ready()
async def on_member_remove(self, member: discord.Member):
self._logger.debug(__name__, f"Module {type(self)} started")
await self._remove_user(member)
await self._notify_team(member)

View File

@@ -0,0 +1,93 @@
from typing import Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.events import OnMessageDeleteABC
from cpl_discord.service import DiscordBotServiceABC
from bot_core.helper.log_message_helper import LogMessageHelper
from bot_core.logging.message_logger import MessageLogger
from bot_data.abc.client_repository_abc import ClientRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server_config import ServerConfig
from bot_data.model.user import User
class BaseOnMessageDeleteEvent(OnMessageDeleteABC):
def __init__(
self,
config: ConfigurationABC,
logger: MessageLogger,
db: DatabaseContextABC,
bot: DiscordBotServiceABC,
users: UserRepositoryABC,
clients: ClientRepositoryABC,
servers: ServerRepositoryABC,
):
OnMessageDeleteABC.__init__(self)
self._config = config
self._logger = logger
self._db = db
self._bot = bot
self._users = users
self._clients = clients
self._servers = servers
def _append_deleted_message_count(self, g_id: int):
try:
self._clients.append_deleted_message_count(self._bot.user.id, g_id, 1)
self._db.save_changes()
except Exception as e:
self._logger.error(
__name__, f"Cannot edit client {self._bot.user.id}@{g_id}", e
)
def _handle_message_delete(self, message: discord.Message):
dc_user_id = message.author.id
try:
server = self._servers.get_server_by_discord_id(message.guild.id)
except Exception as e:
self._logger.error(__name__, f"Cannot get server {message.guild.id}", e)
return
user: Optional[User] = None
try:
user = self._users.find_user_by_discord_id_and_server_id(
dc_user_id, server.id
)
except Exception as e:
self._logger.error(__name__, f"Cannot get user {dc_user_id}", e)
return
if user is None:
self._logger.error(__name__, f"User not found {dc_user_id}")
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{message.guild.id}"
)
old_xp = user.xp
user.xp -= settings.xp_per_message
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(
__name__,
f"Removed message from user {user}. xp: from {old_xp} to {user.xp}",
)
async def on_message_delete(self, message: discord.Message):
self._logger.debug(__name__, f"Module {type(self)} started")
if message is None or message.guild is None:
return
self._logger.info(
__name__, f"Received message: {LogMessageHelper.get_log_string(message)}"
)
self._append_deleted_message_count(message.guild.id)
if not message.author.bot:
self._handle_message_delete(message)
self._logger.debug(__name__, f"Module {type(self)} stopped")

View File

@@ -0,0 +1,103 @@
from typing import Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.events import OnMessageABC
from cpl_discord.service import DiscordBotServiceABC
from bot_core.helper.event_checks import EventChecks
from bot_core.helper.log_message_helper import LogMessageHelper
from bot_core.logging.message_logger import MessageLogger
from bot_core.service.client_utils_service import ClientUtilsService
from bot_data.abc.client_repository_abc import ClientRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server_config import ServerConfig
from bot_data.model.user import User
class BaseOnMessageEvent(OnMessageABC):
def __init__(
self,
config: ConfigurationABC,
logger: MessageLogger,
client_utils: ClientUtilsService,
db: DatabaseContextABC,
bot: DiscordBotServiceABC,
users: UserRepositoryABC,
clients: ClientRepositoryABC,
servers: ServerRepositoryABC,
):
OnMessageABC.__init__(self)
self._config = config
self._logger = logger
self._client_utils = client_utils
self._bot = bot
self._db = db
self._bot = bot
self._users = users
self._clients = clients
self._servers = servers
def _append_received_message_count(self, g_id: int):
try:
self._clients.append_received_message_count(self._bot.user.id, g_id, 1)
self._db.save_changes()
except Exception as e:
self._logger.error(
__name__, f"Cannot edit client {self._bot.user.id}@{g_id}", e
)
def _handle_message_for_xp(self, message: discord.Message):
dc_user_id = message.author.id
try:
server = self._servers.get_server_by_discord_id(message.guild.id)
except Exception as e:
self._logger.error(__name__, f"Cannot get server {message.guild.id}", e)
return
user: Optional[User] = None
try:
user = self._users.find_user_by_discord_id_and_server_id(
dc_user_id, server.id
)
except Exception as e:
self._logger.error(__name__, f"Cannot get user {dc_user_id}", e)
return
if user is None:
self._logger.error(__name__, f"User not found {dc_user_id}")
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{server.discord_id}"
)
if self._client_utils.is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
message.created_at, user, settings
):
return
old_xp = user.xp
user.xp += settings.xp_per_message
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(
__name__, f"User {user} sent message. xp: from {old_xp} to {user.xp}"
)
@EventChecks.check_is_ready()
async def on_message(self, message: discord.Message):
self._logger.debug(__name__, f"Module {type(self)} started")
if message is None or message.guild is None:
return
self._logger.info(
__name__, f"Received message: {LogMessageHelper.get_log_string(message)}"
)
self._append_received_message_count(message.guild.id)
if not message.author.bot:
self._handle_message_for_xp(message)
self._logger.debug(__name__, f"Module {type(self)} stopped")

View File

@@ -0,0 +1,35 @@
from cpl_core.logging import LoggerABC
from cpl_discord.events.on_raw_reaction_add_abc import OnRawReactionAddABC
from cpl_discord.service import DiscordBotServiceABC
from discord import RawReactionActionEvent
from bot_core.helper.event_checks import EventChecks
from bot_data.abc.auto_role_repository_abc import AutoRoleRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from modules.base.helper.base_reaction_handler import BaseReactionHandler
class BaseOnRawReactionAddEvent(OnRawReactionAddABC):
def __init__(
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
servers: ServerRepositoryABC,
auto_roles: AutoRoleRepositoryABC,
reaction_handler: BaseReactionHandler,
):
OnRawReactionAddABC.__init__(self)
self._logger = logger
self._bot = bot
self._servers = servers
self._auto_roles = auto_roles
self._reaction_handler = reaction_handler
@EventChecks.check_is_ready()
async def on_raw_reaction_add(self, payload: RawReactionActionEvent):
self._logger.debug(__name__, f"Module {type(self)} started")
await self._reaction_handler.handle(payload, "add")
self._logger.debug(__name__, f"Module {type(self)} stopped")

View File

@@ -0,0 +1,35 @@
from cpl_core.logging import LoggerABC
from cpl_discord.events.on_raw_reaction_remove_abc import OnRawReactionRemoveABC
from cpl_discord.service import DiscordBotServiceABC
from discord import RawReactionActionEvent
from bot_core.helper.event_checks import EventChecks
from bot_data.abc.auto_role_repository_abc import AutoRoleRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from modules.base.helper.base_reaction_handler import BaseReactionHandler
class BaseOnRawReactionRemoveEvent(OnRawReactionRemoveABC):
def __init__(
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
servers: ServerRepositoryABC,
auto_roles: AutoRoleRepositoryABC,
reaction_handler: BaseReactionHandler,
):
OnRawReactionRemoveABC.__init__(self)
self._logger = logger
self._bot = bot
self._servers = servers
self._auto_roles = auto_roles
self._reaction_handler = reaction_handler
@EventChecks.check_is_ready()
async def on_raw_reaction_remove(self, payload: RawReactionActionEvent):
self._logger.debug(__name__, f"Module {type(self)} started")
await self._reaction_handler.handle(payload, "remove")
self._logger.debug(__name__, f"Module {type(self)} stopped")

View File

@@ -0,0 +1,46 @@
import discord
from cpl_core.logging import LoggerABC
from cpl_discord.events.on_scheduled_event_update_abc import OnScheduledEventUpdateABC
from cpl_discord.service import DiscordBotServiceABC
from discord import EventStatus
from modules.base.model.active_event import ActiveEvent
from modules.base.service.event_service import EventService
class BaseOnScheduledEventUpdateEvent(OnScheduledEventUpdateABC):
def __init__(
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
events: EventService,
):
OnScheduledEventUpdateABC.__init__(self)
self._logger = logger
self._bot = bot
self._events = events
async def on_scheduled_event_update(
self, before: discord.ScheduledEvent, after: discord.ScheduledEvent
):
self._logger.debug(__name__, f"Module {type(self)} started")
# save started event
if before.status != after.status and after.status == EventStatus.active:
active_event = ActiveEvent(after)
self._events.add_event(active_event)
for member in after.channel.members:
self._events.give_xp_for_event_participation(member, active_event)
# delete stopped event
if before.status != after.status and (
after.status.value == EventStatus.cancelled.value
or after.status.value == EventStatus.completed.value
):
event = self._events.get_active_event(after)
if event is None:
return
self._events.remove_event(event)
self._logger.debug(__name__, f"Module {type(self)} stopped")

View File

@@ -0,0 +1,168 @@
from datetime import datetime
from typing import Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnVoiceStateUpdateABC
from bot_core.helper.event_checks import EventChecks
from bot_data.abc.known_user_repository_abc import KnownUserRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_joined_server_repository_abc import UserJoinedServerRepositoryABC
from bot_data.abc.user_joined_voice_channel_repository_abc import (
UserJoinedVoiceChannelRepositoryABC,
)
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server import Server
from bot_data.model.server_config import ServerConfig
from bot_data.model.user import User
from bot_data.model.user_joined_voice_channel import UserJoinedVoiceChannel
class BaseOnVoiceStateUpdateEvent(OnVoiceStateUpdateABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
servers: ServerRepositoryABC,
known_users: KnownUserRepositoryABC,
users: UserRepositoryABC,
user_joins: UserJoinedServerRepositoryABC,
user_joins_vc: UserJoinedVoiceChannelRepositoryABC,
db: DatabaseContextABC,
):
OnVoiceStateUpdateABC.__init__(self)
self._config = config
self._logger = logger
self._servers = servers
self._known_users = known_users
self._users = users
self._user_joins = user_joins
self._user_joins_vc = user_joins_vc
self._db = db
self._logger.info(__name__, f"Module {type(self)} loaded")
def _update_voice_state(
self, joined: bool, dc_user_id: int, channel_id: int, server: Server
):
user: Optional[User] = None
try:
user = self._users.get_user_by_discord_id_and_server_id(
dc_user_id, server.id
)
except Exception as e:
self._logger.error(__name__, f"Cannot get user {dc_user_id}", e)
return
if user is None:
self._logger.error(__name__, f"User not found {dc_user_id}")
return
try:
if joined:
join = UserJoinedVoiceChannel(user, channel_id, datetime.now())
self._user_joins_vc.add_user_joined_voice_channel(join)
self._db.save_changes()
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{server.discord_id}"
)
join = self._user_joins_vc.get_active_user_joined_voice_channel_by_user_id(
user.id
)
join.leaved_on = datetime.now()
old_xp = user.xp
user.xp += round(join.time * settings.xp_per_ontime_hour)
self._user_joins_vc.update_user_joined_voice_channel(join)
self._users.update_user(user)
self._db.save_changes()
self._logger.debug(
__name__,
f"User {user} leaved_on {join.leaved_on}. Ontime: {join.time}h | xp: from {old_xp} to {user.xp}",
)
except Exception as e:
self._logger.error(__name__, f"Ontime validation failed", e)
@EventChecks.check_is_ready()
async def on_voice_state_update(
self,
member: discord.Member,
before: discord.VoiceState,
after: discord.VoiceState,
):
self._logger.debug(__name__, f"Module {type(self)} started")
self._logger.trace(
__name__,
f"Detected on_voice_state_update {member.id} from {before} to {after}",
)
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{member.guild.id}"
)
server = self._servers.get_server_by_discord_id(member.guild.id)
try:
if before.channel == after.channel:
return
# join
if (
before.channel is None
and after.channel is not None
and after.channel.id not in settings.afk_channel_ids
):
self._logger.trace(__name__, f"User {member.id} joined {after.channel}")
self._update_voice_state(True, member.id, after.channel.id, server)
# leave
elif (
before.channel is not None
and after.channel is None
and before.channel.id not in settings.afk_channel_ids
):
self._logger.trace(__name__, f"User {member.id} left {before.channel}")
self._update_voice_state(False, member.id, before.channel.id, server)
# channel to channel
elif before.channel is not None and after.channel is not None:
# joined
if (
before.channel.id in settings.afk_channel_ids
and after.channel.id not in settings.afk_channel_ids
):
self._logger.trace(
__name__, f"User {member.id} joined {after.channel}"
)
self._update_voice_state(True, member.id, after.channel.id, server)
# left
elif (
after.channel.id in settings.afk_channel_ids
and before.channel.id not in settings.afk_channel_ids
):
self._logger.trace(
__name__, f"User {member.id} left {before.channel}"
)
self._update_voice_state(
False, member.id, before.channel.id, server
)
else:
self._logger.trace(
__name__, f"User {member.id} switched to {after.channel}"
)
self._update_voice_state(
False, member.id, before.channel.id, server
)
self._update_voice_state(True, member.id, after.channel.id, server)
except Exception as e:
self._logger.error(
__name__, f"Cannot handle voice state for user {member.id}", e
)

View File

@@ -0,0 +1,66 @@
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnVoiceStateUpdateABC
from cpl_translation import TranslatePipe
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.helper.event_checks import EventChecks
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.model.server_config import ServerConfig
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class BaseOnVoiceStateUpdateEventHelpChannel(OnVoiceStateUpdateABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
servers: ServerRepositoryABC,
permissions: PermissionServiceABC,
message_service: MessageServiceABC,
t: TranslatePipe,
):
OnVoiceStateUpdateABC.__init__(self)
self._config = config
self._logger = logger
self._servers = servers
self._permissions = permissions
self._message_service = message_service
self._t = t
self._logger.info(__name__, f"Module {type(self)} loaded")
async def _notify_team(self, member: discord.Member):
self._logger.debug(__name__, f"Notify team that a member needs help")
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{member.guild.id}"
)
channel = member.guild.get_channel(settings.team_channel_id)
await self._message_service.send_channel_message(
channel,
self._t.transform("modules.base.member_joined_help_voice_channel").format(
member.mention
),
is_persistent=True,
)
self._logger.trace(__name__, f"Notified team that a member need help")
@EventChecks.check_is_ready()
async def on_voice_state_update(
self,
member: discord.Member,
before: discord.VoiceState,
after: discord.VoiceState,
):
self._logger.debug(__name__, f"Module {type(self)} started")
server = self._servers.get_server_by_discord_id(member.guild.id)
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{server.discord_id}"
)
if after.channel is None or after.channel.id != settings.help_voice_channel_id:
return
await self._notify_team(member)
self._logger.debug(__name__, f"Module {type(self)} stopped")

View File

@@ -0,0 +1,52 @@
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.events import OnVoiceStateUpdateABC
from bot_core.helper.event_checks import EventChecks
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from modules.base.service.event_service import EventService
class BaseOnVoiceStateUpdateEventScheduledEventBonus(OnVoiceStateUpdateABC):
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
servers: ServerRepositoryABC,
users: UserRepositoryABC,
events: EventService,
db: DatabaseContextABC,
):
OnVoiceStateUpdateABC.__init__(self)
self._config = config
self._logger = logger
self._servers = servers
self._users = users
self._events = events
self._db = db
self._logger.info(__name__, f"Module {type(self)} loaded")
@EventChecks.check_is_ready()
async def on_voice_state_update(
self,
member: discord.Member,
before: discord.VoiceState,
after: discord.VoiceState,
):
self._logger.debug(__name__, f"Module {type(self)} started")
if member.bot or after.channel is None:
self._logger.debug(__name__, f"Module {type(self)} stopped")
return
active_event = self._events.get_active_event_by_channel_id(after.channel.id)
if active_event is None:
self._logger.debug(__name__, f"Module {type(self)} stopped")
return
self._events.give_xp_for_event_participation(member, active_event)
self._logger.debug(__name__, f"Module {type(self)} stopped")

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base.forms"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")

View File

@@ -0,0 +1,53 @@
import discord
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from discord import ui, TextStyle
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.logging.command_logger import CommandLogger
from bot_data.model.technician_config import TechnicianConfig
class BugReportForm(ui.Modal):
description = ui.TextInput(
label="Report a bug", required=True, style=TextStyle.long
)
def __init__(
self,
technician_config: TechnicianConfig,
bot: DiscordBotServiceABC,
db: DatabaseContextABC,
logger: CommandLogger,
message_service: MessageServiceABC,
t: TranslatePipe,
):
ui.Modal.__init__(self, title=t.transform("modules.base.bug.title"))
self._technician_config = technician_config
self._bot = bot
self._db = db
self._message_service = message_service
self._logger = logger
self._t = t
self.description.label = t.transform("modules.base.bug.label")
async def on_submit(self, interaction: discord.Interaction):
self._logger.debug(__name__, f"Started bug report form")
for t in self._technician_config.technician_ids:
member = self._bot.get_user(t)
await self._message_service.send_dm_message(
self._t.transform("modules.base.bug.message").format(
interaction.user.mention, self.description.value
),
member,
without_tracking=True,
)
await self._message_service.send_interaction_msg(
interaction, self._t.transform("modules.base.bug.response")
)
self._logger.trace(__name__, f"Finished bug report form")

View File

@@ -0,0 +1,51 @@
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_translation import TranslatePipe
from discord import ui, TextStyle
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_core.logging.command_logger import CommandLogger
from bot_data.model.server_config import ServerConfig
class ComplaintForm(ui.Modal):
description = ui.TextInput(
label="Complain about something", required=True, style=TextStyle.long
)
def __init__(
self,
config: ConfigurationABC,
db: DatabaseContextABC,
logger: CommandLogger,
message_service: MessageServiceABC,
t: TranslatePipe,
):
ui.Modal.__init__(self, title=t.transform("modules.base.complaints.title"))
self._config = config
self._db = db
self._message_service = message_service
self._logger = logger
self._t = t
self.description.label = t.transform("modules.base.complaints.label")
async def on_submit(self, interaction: discord.Interaction):
self._logger.debug(__name__, f"Started complaint command form")
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{interaction.guild.id}"
)
channel = interaction.guild.get_channel(settings.team_channel_id)
await self._message_service.send_channel_message(
channel,
self._t.transform("modules.base.complaints.message").format(
interaction.user.mention, self.description.value
),
is_persistent=True,
)
await self._message_service.send_interaction_msg(
interaction, self._t.transform("modules.base.complaints.response")
)
self._logger.trace(__name__, f"Finished complaint command form")

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base.helper"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")

View File

@@ -0,0 +1,97 @@
from datetime import datetime
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_discord.service import DiscordBotServiceABC
from discord import RawReactionActionEvent
from bot_core.abc.client_utils_abc import ClientUtilsABC
from bot_core.helper.log_message_helper import LogMessageHelper
from bot_core.logging.message_logger import MessageLogger
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server_config import ServerConfig
class BaseReactionHandler:
def __init__(
self,
config: ConfigurationABC,
logger: MessageLogger,
bot: DiscordBotServiceABC,
servers: ServerRepositoryABC,
users: UserRepositoryABC,
client_utils: ClientUtilsABC,
db: DatabaseContextABC,
):
self._config = config
self._logger = logger
self._bot = bot
self._servers = servers
self._users = users
self._client_utils = client_utils
self._db = db
async def handle(self, payload: RawReactionActionEvent, r_type=None) -> None:
self._logger.trace(__name__, f"Handle reaction {payload} {r_type}")
guild = self._bot.get_guild(payload.guild_id)
member = guild.get_member(payload.user_id)
if member is None:
self._logger.warn(
__name__, f"User {payload.user_id} in {guild.name} not found - skipping"
)
return
try:
log_msg = f"{member.name} reacted"
if payload.emoji.name is not None:
log_msg += f" with {payload.emoji.name}"
try:
channel = guild.get_channel(payload.channel_id)
if channel is None:
self._logger.warn(__name__, f"Channel not found")
return
message = await channel.fetch_message(payload.message_id)
self._logger.info(
__name__,
f"{log_msg} to message {LogMessageHelper.get_log_string(message)}",
)
if member == message.author:
self._logger.debug(__name__, f"Skipping reaction handling")
return
except Exception as e:
self._logger.error(
__name__, f"Getting message for reaction logging failed", e
)
self._logger.info(
__name__, f"{log_msg} to message {payload.message_id}"
)
except Exception as e:
self._logger.error(__name__, f"Reaction logging failed", e)
if member.bot:
return
server = self._servers.get_server_by_discord_id(guild.id)
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id)
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{guild.id}"
)
if r_type == "add":
if self._client_utils.is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
datetime.now(), user, settings, is_reaction=True
):
return
user.xp += settings.xp_per_reaction
self._users.update_user(user)
self._db.save_changes()
elif r_type == "remove":
user.xp -= settings.xp_per_reaction
self._users.update_user(user)
self._db.save_changes()
else:
self._logger.warn(__name__, f"Invalid reaction type {r_type}")

View File

@@ -0,0 +1,41 @@
import discord
from cpl_core.dependency_injection import ServiceProviderABC
from cpl_query.extension import List
from discord import Interaction, app_commands
from discord.app_commands import Transformer, Choice
from bot_core.abc.client_utils_abc import ClientUtilsABC
class VoiceChannelTransformer(Transformer):
async def transform(
self, interaction: Interaction, value: str, /
) -> discord.VoiceChannel:
voice_channel = (
List(discord.VoiceChannel, interaction.guild.voice_channels)
.where(lambda x: str(x.id) == value)
.first_or_default()
)
return voice_channel
async def autocomplete(
self, interaction: Interaction, current: str, /
) -> list[Choice[str]]:
@ServiceProviderABC.inject
def get_client_utils(client_utils: ClientUtilsABC) -> ClientUtilsABC:
return client_utils
voice_channels = List(discord.Role, interaction.guild.voice_channels).where(
lambda x: len(x.members) > 0
)
return [
app_commands.Choice(
name=f"{vc.name}"
if vc.category is None
else f"{vc.name}: {vc.category.name}",
value=vc.name,
)
for vc in get_client_utils().get_auto_complete_list(
voice_channels, current, lambda x: x.name
)
]

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base.model"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")

View File

@@ -0,0 +1,18 @@
import discord
from cpl_query.extension import List
from bot_data.model.user import User
class ActiveEvent:
def __init__(self, event: discord.ScheduledEvent):
self._event = event
self._participants = List(User)
@property
def event(self) -> discord.ScheduledEvent:
return self._event
@property
def participants(self) -> List[User]:
return self._participants

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base.service"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")

View File

@@ -0,0 +1,69 @@
from typing import Optional
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_query.extension import List
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.model.server_config import ServerConfig
from modules.base.model.active_event import ActiveEvent
class EventService:
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
servers: ServerRepositoryABC,
users: UserRepositoryABC,
db: DatabaseContextABC,
):
self._config = config
self._logger = logger
self._servers = servers
self._users = users
self._db = db
self._active_events = List(ActiveEvent)
def add_event(self, event: ActiveEvent):
if self._active_events.contains(event):
return
self._active_events.add(event)
def get_active_event(self, event: discord.ScheduledEvent) -> Optional[ActiveEvent]:
return self._active_events.where(
lambda x: x.event.id == event.id
).single_or_default()
def get_active_event_by_channel_id(self, channel_id: int) -> Optional[ActiveEvent]:
return self._active_events.where(
lambda x: x.event.channel is not None and x.event.channel.id == channel_id
).single_or_default()
def remove_event(self, event: ActiveEvent):
if not self._active_events.contains(event):
return
self._active_events.remove(event)
def give_xp_for_event_participation(
self, member: discord.Member, active_event: ActiveEvent
):
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id)
if active_event.participants.any(lambda x: x.id == user.id):
self._logger.debug(__name__, f"Module {type(self)} stopped")
return
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{server.discord_id}"
)
user.xp += settings.xp_per_event_participation
self._users.update_user(user)
self._db.save_changes()
active_event.participants.append(user)

View File

@@ -0,0 +1,211 @@
import discord
from cpl_core.configuration import ConfigurationABC
from cpl_core.database.context import DatabaseContextABC
from cpl_core.logging import LoggerABC
from cpl_discord.service import DiscordBotServiceABC
from cpl_translation import TranslatePipe
from bot_core.abc.message_service_abc import MessageServiceABC
from bot_data.abc.level_repository_abc import LevelRepositoryABC
from bot_data.abc.server_repository_abc import ServerRepositoryABC
from bot_data.abc.user_repository_abc import UserRepositoryABC
from bot_data.abc.user_warnings_repository_abc import UserWarningsRepositoryABC
from bot_data.model.server_config import ServerConfig
from bot_data.model.user import User
from bot_data.model.user_warnings import UserWarnings
from modules.level.service.level_service import LevelService
from modules.permission.abc.permission_service_abc import PermissionServiceABC
class UserWarningsService:
def __init__(
self,
config: ConfigurationABC,
logger: LoggerABC,
db: DatabaseContextABC,
bot: DiscordBotServiceABC,
warnings: UserWarningsRepositoryABC,
users: UserRepositoryABC,
servers: ServerRepositoryABC,
levels: LevelRepositoryABC,
level_service: LevelService,
message_service: MessageServiceABC,
t: TranslatePipe,
permissions: PermissionServiceABC,
):
self._config = config
self._logger = logger
self._db = db
self._bot = bot
self._warnings = warnings
self._users = users
self._servers = servers
self._levels = levels
self._level_service = level_service
self._message_service = message_service
self._t = t
self._permissions = permissions
async def notify_team(
self, member: discord.Member, description: str, removed=False
):
try:
settings: ServerConfig = self._config.get_configuration(
f"ServerConfig_{member.guild.id}"
)
channel = member.guild.get_channel(settings.team_channel_id)
if removed:
translation = self._t.transform(
"modules.base.warnings.team_removed"
).format(description, member.mention)
else:
translation = self._t.transform(
"modules.base.warnings.team_warned"
).format(member.mention, description)
self._bot.loop.create_task(
self._message_service.send_channel_message(
channel, translation, is_persistent=True
)
)
except Exception as e:
self._logger.error(
__name__, f"Team notification for user warning failed!", e
)
async def notify_user(self, member: discord.Member, message: str):
try:
# run as task to keep the interaction alive
self._bot.loop.create_task(
self._message_service.send_dm_message(message, member)
)
except Exception as e:
self._logger.error(
__name__, f"User notification for user warning failed!", e
)
async def check_for_warnings(self, member: discord.Member, user: User):
existing_warnings = self._warnings.get_user_warnings_by_user_id(user.id)
if existing_warnings.count() == 1:
await self.notify_user(
member, self._t.transform("modules.base.warnings.first")
)
elif existing_warnings.count() == 2:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(
member.id, server.id
)
level = self._level_service.get_level(user)
levels = self._levels.get_levels_by_server_id(server.id).order_by(
lambda l: l.min_xp
)
new_level = levels.where(
lambda l: l.min_xp < level.min_xp
).last_or_default()
if new_level is not None:
user.xp = new_level.min_xp
self._users.update_user(user)
self._db.save_changes()
await self.notify_user(
member, self._t.transform("modules.base.warnings.second")
)
elif existing_warnings.count() == 3:
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(
member.id, server.id
)
levels = self._levels.get_levels_by_server_id(server.id)
new_level = (
levels.where(lambda l: l.min_xp > 0)
.order_by(lambda l: l.min_xp)
.last_or_default()
)
if new_level is not None:
user.xp = new_level.min_xp
self._users.update_user(user)
self._db.save_changes()
await self.notify_user(
member, self._t.transform("modules.base.warnings.third")
)
elif existing_warnings.count() >= 4:
user.xp = 0
self._users.update_user(user)
self._db.save_changes()
await self.notify_team(
member,
self._t.transform("modules.base.warnings.kick").format(member.mention),
)
await member.kick()
async def _notify_after_add(self, member: discord.Member, warning: UserWarnings):
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id)
await self.notify_user(
member,
self._t.transform("modules.base.warnings.warned").format(
warning.description
),
)
await self.notify_team(member, warning.description)
await self.check_for_warnings(member, user)
def _add_warnings(
self, member: discord.Member, description: str, author_id: int = None
):
server = self._servers.get_server_by_discord_id(member.guild.id)
user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id)
author = None
if author_id is not None:
author = self._users.get_user_by_discord_id_and_server_id(
author_id, server.id
)
warning = UserWarnings(description, user, author)
self._warnings.add_user_warnings(warning)
self._db.save_changes()
return warning
def add_warnings(
self, member: discord.Member, description: str, author_id: int = None
):
warning = self._add_warnings(member, description, author_id)
self._bot.loop.create_task(self._notify_after_add(member, warning))
async def add_warnings_async(
self, member: discord.Member, description: str, author_id: int = None
):
warning = self._add_warnings(member, description, author_id)
await self._notify_after_add(member, warning)
async def _notify_after_remove(self, warning: UserWarnings):
guild = self._bot.get_guild(warning.user.server.discord_id)
member = guild.get_member(warning.user.discord_id)
await self.notify_user(
member,
self._t.transform("modules.base.warnings.removed").format(
warning.description
),
)
await self.notify_team(member, warning.description, removed=True)
def _remove_warnings(self, id: int):
warning = self._warnings.get_user_warnings_by_id(id)
self._warnings.delete_user_warnings(warning)
self._db.save_changes()
return warning
def remove_warnings(self, id: int):
warning = self._remove_warnings(id)
self._bot.loop.create_task(self._notify_after_remove(warning))
async def remove_warnings_async(self, id: int):
warning = self._remove_warnings(id)
await self._notify_after_remove(warning)

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot sh-edraft.de Discord bot
~~~~~~~~~~~~~~~~~~~
Discord bot for customers of sh-edraft.de
:copyright: (c) 2022 - 2023 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "modules.base.thread"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
__version__ = "1.2.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="1", minor="2", micro="0")