1.0.0 #253
| @@ -16,6 +16,7 @@ from bot_data.migration.user_joined_game_server_migration import UserJoinedGameS | ||||
| from bot_data.migration.user_message_count_per_hour_migration import ( | ||||
|     UserMessageCountPerHourMigration, | ||||
| ) | ||||
| from bot_data.migration.user_warning_migration import UserWarningMigration | ||||
| from bot_data.service.migration_service import MigrationService | ||||
|  | ||||
|  | ||||
| @@ -38,3 +39,4 @@ class StartupMigrationExtension(StartupExtensionABC): | ||||
|         services.add_transient(MigrationABC, ApiKeyMigration)  # 09.02.2023 #162 - 1.0.0 | ||||
|         services.add_transient(MigrationABC, UserJoinedGameServerMigration)  # 12.02.2023 #181 - 1.0.0 | ||||
|         services.add_transient(MigrationABC, RemoveStatsMigration)  # 19.02.2023 #190 - 1.0.0 | ||||
|         services.add_transient(MigrationABC, UserWarningMigration)  # 21.02.2023 #35 - 1.0.0 | ||||
|   | ||||
| @@ -207,6 +207,28 @@ | ||||
|       }, | ||||
|       "unregister": { | ||||
|         "success": "Verlinkung wurde entfernt :D" | ||||
|       }, | ||||
|       "warnings": { | ||||
|         "warned": "Du wurdest verwarnt. Der Grund ist: {}", | ||||
|         "team_warned": "{} wurde verwarnt. Der Grund ist: {}", | ||||
|         "removed": "Die Verwarnung '{}' wurde entfernt.", | ||||
|         "team_removed": "Die Verwarnung '{}' an {} wurde entfernt.", | ||||
|         "first": "Bei der nächsten verwarnung, wirst du auf das vorherige Level zurückgesetzt!", | ||||
|         "second": "Bei der nächsten verwarnung, wirst du auf das erste Level zurückgesetzt!", | ||||
|         "third": "Bei der nächsten verwarnung, wirst du gekickt und zurückgesetzt!", | ||||
|         "kick": "Ich musste {} aufgrund zu vieler Verwarnungen kicken", | ||||
|         "show": { | ||||
|           "id": "Id", | ||||
|           "description": "Beschreibung" | ||||
|         }, | ||||
|         "add": { | ||||
|           "success": "Verwarnung wurde hinzugefügt :)", | ||||
|           "failed": "Verwarnung konnte nicht hinzugefügt werden :(" | ||||
|         }, | ||||
|         "remove": { | ||||
|           "success": "Verwarnung wurde entfernt :)", | ||||
|           "failed": "Verwarnung konnte nicht entfernt werden :(" | ||||
|         } | ||||
|       } | ||||
|     }, | ||||
|     "boot_log": { | ||||
|   | ||||
							
								
								
									
										35
									
								
								kdb-bot/src/bot_data/abc/user_warnings_repository_abc.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								kdb-bot/src/bot_data/abc/user_warnings_repository_abc.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | ||||
| from abc import ABC, abstractmethod | ||||
|  | ||||
| from cpl_query.extension import List | ||||
|  | ||||
| from bot_data.model.user_warnings import UserWarnings | ||||
|  | ||||
|  | ||||
| class UserWarningsRepositoryABC(ABC): | ||||
|     @abstractmethod | ||||
|     def __init__(self): | ||||
|         pass | ||||
|  | ||||
|     @abstractmethod | ||||
|     def get_user_warnings(self) -> List[UserWarnings]: | ||||
|         pass | ||||
|  | ||||
|     @abstractmethod | ||||
|     def get_user_warnings_by_id(self, id: int) -> UserWarnings: | ||||
|         pass | ||||
|  | ||||
|     @abstractmethod | ||||
|     def get_user_warnings_by_user_id(self, user_id: int) -> List[UserWarnings]: | ||||
|         pass | ||||
|  | ||||
|     @abstractmethod | ||||
|     def add_user_warnings(self, user_warnings: UserWarnings): | ||||
|         pass | ||||
|  | ||||
|     @abstractmethod | ||||
|     def update_user_warnings(self, user_warnings: UserWarnings): | ||||
|         pass | ||||
|  | ||||
|     @abstractmethod | ||||
|     def delete_user_warnings(self, user_warnings: UserWarnings): | ||||
|         pass | ||||
| @@ -21,6 +21,7 @@ from bot_data.abc.user_message_count_per_hour_repository_abc import ( | ||||
|     UserMessageCountPerHourRepositoryABC, | ||||
| ) | ||||
| from bot_data.abc.user_repository_abc import UserRepositoryABC | ||||
| from bot_data.abc.user_warnings_repository_abc import UserWarningsRepositoryABC | ||||
| from bot_data.service.api_key_repository_service import ApiKeyRepositoryService | ||||
| from bot_data.service.auth_user_repository_service import AuthUserRepositoryService | ||||
| from bot_data.service.auto_role_repository_service import AutoRoleRepositoryService | ||||
| @@ -40,6 +41,7 @@ from bot_data.service.user_message_count_per_hour_repository_service import ( | ||||
|     UserMessageCountPerHourRepositoryService, | ||||
| ) | ||||
| from bot_data.service.user_repository_service import UserRepositoryService | ||||
| from bot_data.service.user_warnings_repository_service import UserWarningsRepositoryService | ||||
|  | ||||
|  | ||||
| class DataModule(ModuleABC): | ||||
| @@ -61,6 +63,7 @@ class DataModule(ModuleABC): | ||||
|         services.add_transient(UserJoinedGameServerRepositoryABC, UserJoinedGameServerRepositoryService) | ||||
|         services.add_transient(AutoRoleRepositoryABC, AutoRoleRepositoryService) | ||||
|         services.add_transient(LevelRepositoryABC, LevelRepositoryService) | ||||
|         services.add_transient(UserWarningsRepositoryABC, UserWarningsRepositoryService) | ||||
|         services.add_transient( | ||||
|             UserMessageCountPerHourRepositoryABC, | ||||
|             UserMessageCountPerHourRepositoryService, | ||||
|   | ||||
							
								
								
									
										37
									
								
								kdb-bot/src/bot_data/migration/user_warning_migration.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								kdb-bot/src/bot_data/migration/user_warning_migration.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,37 @@ | ||||
| from bot_core.logging.database_logger import DatabaseLogger | ||||
| from bot_data.abc.migration_abc import MigrationABC | ||||
| from bot_data.db_context import DBContext | ||||
|  | ||||
|  | ||||
| class UserWarningMigration(MigrationABC): | ||||
|     name = "1.0_UserWarningMigration" | ||||
|  | ||||
|     def __init__(self, logger: DatabaseLogger, db: DBContext): | ||||
|         MigrationABC.__init__(self) | ||||
|         self._logger = logger | ||||
|         self._db = db | ||||
|         self._cursor = db.cursor | ||||
|  | ||||
|     def upgrade(self): | ||||
|         self._logger.debug(__name__, "Running upgrade") | ||||
|  | ||||
|         self._cursor.execute( | ||||
|             str( | ||||
|                 f""" | ||||
|             CREATE TABLE IF NOT EXISTS `UserWarnings` ( | ||||
|                 `Id` BIGINT NOT NULL AUTO_INCREMENT, | ||||
|                 `Description` VARCHAR(255) NOT NULL, | ||||
|                 `UserId` BIGINT NOT NULL, | ||||
|                 `Author` BIGINT NULL, | ||||
|                 `CreatedAt` DATETIME(6), | ||||
|                 `LastModifiedAt` DATETIME(6), | ||||
|                 PRIMARY KEY(`Id`), | ||||
|                 FOREIGN KEY (`UserId`) REFERENCES `Users`(`UserId`), | ||||
|                 FOREIGN KEY (`Author`) REFERENCES `Users`(`UserId`) | ||||
|             ); | ||||
|             """ | ||||
|             ) | ||||
|         ) | ||||
|  | ||||
|     def downgrade(self): | ||||
|         self._cursor.execute("DROP TABLE `UserWarnings`;") | ||||
							
								
								
									
										105
									
								
								kdb-bot/src/bot_data/model/user_warnings.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								kdb-bot/src/bot_data/model/user_warnings.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | ||||
| from datetime import datetime | ||||
| from typing import Optional | ||||
|  | ||||
| from cpl_core.database import TableABC | ||||
|  | ||||
| from bot_data.model.user import User | ||||
|  | ||||
|  | ||||
| # had to name it UserWarnings instead of UserWarning because UserWarning is a builtin class | ||||
| class UserWarnings(TableABC): | ||||
|     def __init__( | ||||
|         self, | ||||
|         description: str, | ||||
|         user: User, | ||||
|         author: Optional[User], | ||||
|         created_at: datetime = None, | ||||
|         modified_at: datetime = None, | ||||
|         id=0, | ||||
|     ): | ||||
|         self._id = id | ||||
|         self._description = description | ||||
|         self._user = user | ||||
|         self._author = author | ||||
|  | ||||
|         TableABC.__init__(self) | ||||
|         self._created_at = created_at if created_at is not None else self._created_at | ||||
|         self._modified_at = modified_at if modified_at is not None else self._modified_at | ||||
|  | ||||
|     @property | ||||
|     def id(self) -> int: | ||||
|         return self._id | ||||
|  | ||||
|     @property | ||||
|     def description(self) -> str: | ||||
|         return self._description | ||||
|  | ||||
|     @property | ||||
|     def user(self) -> User: | ||||
|         return self._user | ||||
|  | ||||
|     @property | ||||
|     def author(self) -> Optional[User]: | ||||
|         return self._author | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_select_all_string() -> str: | ||||
|         return str( | ||||
|             f""" | ||||
|                 SELECT * FROM `UserWarnings`; | ||||
|             """ | ||||
|         ) | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_select_by_id_string(id: int) -> str: | ||||
|         return str( | ||||
|             f""" | ||||
|                 SELECT * FROM `UserWarnings` | ||||
|                 WHERE `Id` = {id}; | ||||
|             """ | ||||
|         ) | ||||
|  | ||||
|     @staticmethod | ||||
|     def get_select_by_user_id_string(id: int) -> str: | ||||
|         return str( | ||||
|             f""" | ||||
|                 SELECT * FROM `UserWarnings` | ||||
|                 WHERE `UserId` = {id}; | ||||
|             """ | ||||
|         ) | ||||
|  | ||||
|     @property | ||||
|     def insert_string(self) -> str: | ||||
|         return str( | ||||
|             f""" | ||||
|             INSERT INTO `UserWarnings` ( | ||||
|                 `Description`, `UserId`, {"" if self._author is None else f"`Author`,"} `CreatedAt`, `LastModifiedAt` | ||||
|             ) VALUES ( | ||||
|                 '{self._description}', | ||||
|                 {self._user.id}, | ||||
|                 {"" if self._author is None else f"{self._author.id},"} | ||||
|                 '{self._created_at}', | ||||
|                 '{self._modified_at}' | ||||
|             ); | ||||
|             """ | ||||
|         ) | ||||
|  | ||||
|     @property | ||||
|     def udpate_string(self) -> str: | ||||
|         return str( | ||||
|             f""" | ||||
|             UPDATE `UserWarnings` | ||||
|             SET `Description` = '{self._description}', | ||||
|             `LastModifiedAt` = '{self._modified_at}' | ||||
|             WHERE `Id` = {self._id}; | ||||
|             """ | ||||
|         ) | ||||
|  | ||||
|     @property | ||||
|     def delete_string(self) -> str: | ||||
|         return str( | ||||
|             f""" | ||||
|             DELETE FROM `UserWarnings` | ||||
|             WHERE `Id` = {self._id}; | ||||
|             """ | ||||
|         ) | ||||
| @@ -0,0 +1,81 @@ | ||||
| from typing import Optional | ||||
|  | ||||
| from cpl_core.database.context import DatabaseContextABC | ||||
| from cpl_query.extension import List | ||||
|  | ||||
| from bot_core.logging.database_logger import DatabaseLogger | ||||
| from bot_data.abc.user_repository_abc import UserRepositoryABC | ||||
| from bot_data.abc.user_warnings_repository_abc import UserWarningsRepositoryABC | ||||
| from bot_data.model.user_warnings import UserWarnings | ||||
|  | ||||
|  | ||||
| class UserWarningsRepositoryService(UserWarningsRepositoryABC): | ||||
|     def __init__( | ||||
|         self, | ||||
|         logger: DatabaseLogger, | ||||
|         db_context: DatabaseContextABC, | ||||
|         users: UserRepositoryABC, | ||||
|     ): | ||||
|         self._logger = logger | ||||
|         self._context = db_context | ||||
|  | ||||
|         self._users = users | ||||
|         UserWarningsRepositoryABC.__init__(self) | ||||
|  | ||||
|     @staticmethod | ||||
|     def _get_value_from_result(value: any) -> Optional[any]: | ||||
|         if isinstance(value, str) and "NULL" in value: | ||||
|             return None | ||||
|  | ||||
|         return value | ||||
|  | ||||
|     def _from_result(self, sql_result: tuple) -> UserWarnings: | ||||
|         user = self._users.get_user_by_id(self._get_value_from_result(sql_result[2])) | ||||
|         author = None | ||||
|         author_id = self._get_value_from_result(sql_result[2]) | ||||
|         if author_id is not None: | ||||
|             author = self._users.get_user_by_id(author_id) | ||||
|  | ||||
|         return UserWarnings( | ||||
|             self._get_value_from_result(sql_result[1]), | ||||
|             user, | ||||
|             author, | ||||
|             self._get_value_from_result(sql_result[4]), | ||||
|             self._get_value_from_result(sql_result[5]), | ||||
|             id=self._get_value_from_result(sql_result[0]), | ||||
|         ) | ||||
|  | ||||
|     def get_user_warnings(self) -> List[UserWarnings]: | ||||
|         warnings = List(UserWarnings) | ||||
|         self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_all_string()}") | ||||
|         results = self._context.select(UserWarnings.get_select_all_string()) | ||||
|         for result in results: | ||||
|             warnings.append(self._from_result(result)) | ||||
|  | ||||
|         return warnings | ||||
|  | ||||
|     def get_user_warnings_by_id(self, id: int) -> UserWarnings: | ||||
|         self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_by_id_string(id)}") | ||||
|         result = self._context.select(UserWarnings.get_select_by_id_string(id))[0] | ||||
|         return self._from_result(result) | ||||
|  | ||||
|     def get_user_warnings_by_user_id(self, user_id: int) -> List[UserWarnings]: | ||||
|         warnings = List(UserWarnings) | ||||
|         self._logger.trace(__name__, f"Send SQL command: {UserWarnings.get_select_by_user_id_string(user_id)}") | ||||
|         results = self._context.select(UserWarnings.get_select_by_user_id_string(user_id)) | ||||
|         for result in results: | ||||
|             warnings.append(self._from_result(result)) | ||||
|  | ||||
|         return warnings | ||||
|  | ||||
|     def add_user_warnings(self, user_warnings: UserWarnings): | ||||
|         self._logger.trace(__name__, f"Send SQL command: {user_warnings.insert_string}") | ||||
|         self._context.cursor.execute(user_warnings.insert_string) | ||||
|  | ||||
|     def update_user_warnings(self, user_warnings: UserWarnings): | ||||
|         self._logger.trace(__name__, f"Send SQL command: {user_warnings.udpate_string}") | ||||
|         self._context.cursor.execute(user_warnings.udpate_string) | ||||
|  | ||||
|     def delete_user_warnings(self, user_warnings: UserWarnings): | ||||
|         self._logger.trace(__name__, f"Send SQL command: {user_warnings.delete_string}") | ||||
|         self._context.cursor.execute(user_warnings.delete_string) | ||||
| @@ -38,6 +38,7 @@ from modules.base.events.base_on_voice_state_update_event_scheduled_event_bonus | ||||
| from modules.base.helper.base_reaction_handler import BaseReactionHandler | ||||
| from modules.base.service.base_helper_service import BaseHelperService | ||||
| from modules.base.service.event_service import EventService | ||||
| from modules.base.service.user_warnings_service import UserWarningsService | ||||
|  | ||||
|  | ||||
| class BaseModule(ModuleABC): | ||||
| @@ -51,6 +52,7 @@ class BaseModule(ModuleABC): | ||||
|         services.add_transient(BaseHelperABC, BaseHelperService) | ||||
|         services.add_transient(BaseReactionHandler) | ||||
|         services.add_singleton(EventService) | ||||
|         services.add_transient(UserWarningsService) | ||||
|  | ||||
|         # commands | ||||
|         self._dc.add_command(AFKCommand) | ||||
|   | ||||
| @@ -21,6 +21,8 @@ from bot_data.abc.user_joined_voice_channel_repository_abc import ( | ||||
|     UserJoinedVoiceChannelRepositoryABC, | ||||
| ) | ||||
| from bot_data.abc.user_repository_abc import UserRepositoryABC | ||||
| from bot_data.abc.user_warnings_repository_abc import UserWarningsRepositoryABC | ||||
| from modules.base.service.user_warnings_service import UserWarningsService | ||||
| from modules.level.service.level_service import LevelService | ||||
| from modules.permission.abc.permission_service_abc import PermissionServiceABC | ||||
|  | ||||
| @@ -42,6 +44,8 @@ class UserGroup(DiscordCommandABC): | ||||
|         translate: TranslatePipe, | ||||
|         date: DateTimeOffsetPipe, | ||||
|         level: LevelService, | ||||
|         user_warnings: UserWarningsRepositoryABC, | ||||
|         user_warnings_service: UserWarningsService, | ||||
|     ): | ||||
|         DiscordCommandABC.__init__(self) | ||||
|  | ||||
| @@ -59,6 +63,8 @@ class UserGroup(DiscordCommandABC): | ||||
|         self._t = translate | ||||
|         self._date = date | ||||
|         self._level = level | ||||
|         self._user_warnings = user_warnings | ||||
|         self._user_warnings_service = user_warnings_service | ||||
|  | ||||
|         self._logger.trace(__name__, f"Loaded command service: {type(self).__name__}") | ||||
|  | ||||
| @@ -181,9 +187,13 @@ class UserGroup(DiscordCommandABC): | ||||
|             ) | ||||
|  | ||||
|         if is_mod or member == ctx.author: | ||||
|             warnings_string = "" | ||||
|             for warning in self._user_warnings.get_user_warnings_by_user_id(user.id): | ||||
|                 warnings_string += f"{warning.id} - {warning.description}\n" | ||||
|  | ||||
|             embed.add_field( | ||||
|                 name=self._t.transform("modules.base.user.atr.warnings"), | ||||
|                 value=self._t.transform("common.not_implemented_yet"), | ||||
|                 value=warnings_string, | ||||
|                 inline=False, | ||||
|             ) | ||||
|  | ||||
| @@ -341,3 +351,72 @@ class UserGroup(DiscordCommandABC): | ||||
|         self, interaction: discord.Interaction, current: str | ||||
|     ) -> List[app_commands.Choice[str]]: | ||||
|         return [app_commands.Choice(name=value, value=key) for key, value in self._atr_list] | ||||
|  | ||||
|     @user.group() | ||||
|     @commands.guild_only() | ||||
|     async def warning(self, ctx: Context): | ||||
|         pass | ||||
|  | ||||
|     @warning.command() | ||||
|     @commands.guild_only() | ||||
|     @CommandChecks.check_is_ready() | ||||
|     @CommandChecks.check_is_member_moderator() | ||||
|     async def show(self, ctx: Context, member: discord.Member, wait: int = None): | ||||
|         self._logger.debug(__name__, f"Received command user warning show {ctx}:{member}") | ||||
|  | ||||
|         server = self._servers.find_server_by_discord_id(ctx.guild.id) | ||||
|         user = self._users.find_user_by_discord_id_and_server_id(member.id, server.id) | ||||
|  | ||||
|         embed = discord.Embed( | ||||
|             title=member.name, description=self._t.transform("modules.base.user.atr.warnings"), color=int("ef9d0d", 16) | ||||
|         ) | ||||
|  | ||||
|         warnings = self._user_warnings.get_user_warnings_by_user_id(user.id) | ||||
|         warnings_id_string = "" | ||||
|         for warning in warnings: | ||||
|             warnings_id_string += f"{warning.id}\n" | ||||
|  | ||||
|         warnings_description_string = "" | ||||
|         for warning in warnings: | ||||
|             warnings_description_string += f"{warning.description}\n" | ||||
|  | ||||
|         embed.add_field( | ||||
|             name=self._t.transform("modules.base.warnings.show.id"), | ||||
|             value=warnings_id_string, | ||||
|             inline=True, | ||||
|         ) | ||||
|         embed.add_field( | ||||
|             name=self._t.transform("modules.base.warnings.show.description"), | ||||
|             value=warnings_description_string, | ||||
|             inline=True, | ||||
|         ) | ||||
|         await self._message_service.send_interaction_msg(ctx.interaction, embed, wait_before_delete=wait) | ||||
|         self._logger.trace(__name__, f"Finished user warning show command") | ||||
|  | ||||
|     @warning.command() | ||||
|     @commands.guild_only() | ||||
|     @CommandChecks.check_is_ready() | ||||
|     @CommandChecks.check_is_member_moderator() | ||||
|     async def add(self, ctx: Context, member: discord.Member, description: str): | ||||
|         self._logger.debug(__name__, f"Received command user warning add {ctx}:{member},{description}") | ||||
|         try: | ||||
|             await self._user_warnings_service.add_warnings(member, description, ctx.author.id) | ||||
|             await self._message_service.send_ctx_msg(ctx, self._t.transform("modules.base.warnings.add.success")) | ||||
|         except Exception as e: | ||||
|             self._logger.error(__name__, f"Adding user warning failed", e) | ||||
|             await self._message_service.send_ctx_msg(ctx, self._t.transform("modules.base.warnings.add.failed")) | ||||
|         self._logger.trace(__name__, f"Finished user warning add command") | ||||
|  | ||||
|     @warning.command() | ||||
|     @commands.guild_only() | ||||
|     @CommandChecks.check_is_ready() | ||||
|     @CommandChecks.check_is_member_moderator() | ||||
|     async def remove(self, ctx: Context, warning_id: int): | ||||
|         self._logger.debug(__name__, f"Received command user warning remove {ctx}:{warning_id}") | ||||
|         try: | ||||
|             await self._user_warnings_service.remove_warnings(warning_id) | ||||
|             await self._message_service.send_ctx_msg(ctx, self._t.transform("modules.base.warnings.remove.success")) | ||||
|         except Exception as e: | ||||
|             self._logger.error(__name__, f"Removing user warning failed", e) | ||||
|             await self._message_service.send_ctx_msg(ctx, self._t.transform("modules.base.warnings.remove.failed")) | ||||
|         self._logger.trace(__name__, f"Finished user warning remove command") | ||||
|   | ||||
| @@ -20,6 +20,7 @@ class BaseServerSettings(ConfigurationModelABC): | ||||
|         self._afk_command_channel_id: int = 0 | ||||
|         self._help_command_reference_url: str = "" | ||||
|         self._help_voice_channel_id: int = 0 | ||||
|         self._team_channel_id: int = 0 | ||||
|         self._ping_urls = List(str) | ||||
|  | ||||
|     @property | ||||
| @@ -62,6 +63,10 @@ class BaseServerSettings(ConfigurationModelABC): | ||||
|     def help_command_reference_url(self) -> str: | ||||
|         return self._help_command_reference_url | ||||
|  | ||||
|     @property | ||||
|     def team_channel_id(self) -> int: | ||||
|         return self._team_channel_id | ||||
|  | ||||
|     @property | ||||
|     def help_voice_channel_id(self) -> int: | ||||
|         return self._help_voice_channel_id | ||||
| @@ -86,6 +91,7 @@ class BaseServerSettings(ConfigurationModelABC): | ||||
|             self._afk_command_channel_id = settings["AFKCommandChannelId"] | ||||
|             self._help_command_reference_url = settings["HelpCommandReferenceUrl"] | ||||
|             self._help_voice_channel_id = settings["HelpVoiceChannelId"] | ||||
|             self._team_channel_id = settings["TeamChannelId"] | ||||
|             for url in settings["PingURLs"]: | ||||
|                 self._ping_urls.append(url) | ||||
|         except Exception as e: | ||||
|   | ||||
| @@ -57,4 +57,4 @@ class BaseOnVoiceStateUpdateEventHelpChannel(OnVoiceStateUpdateABC): | ||||
|                 a, | ||||
|             ) | ||||
|  | ||||
|             self._logger.debug(__name__, f"Module {type(self)} stopped") | ||||
|         self._logger.debug(__name__, f"Module {type(self)} stopped") | ||||
|   | ||||
							
								
								
									
										133
									
								
								kdb-bot/src/modules/base/service/user_warnings_service.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										133
									
								
								kdb-bot/src/modules/base/service/user_warnings_service.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,133 @@ | ||||
| import discord | ||||
| from cpl_core.database.context import DatabaseContextABC | ||||
| from cpl_core.logging import LoggerABC | ||||
| from cpl_discord.service import DiscordBotServiceABC | ||||
| from cpl_translation import TranslatePipe | ||||
|  | ||||
| from bot_core.abc.message_service_abc import MessageServiceABC | ||||
| from bot_data.abc.level_repository_abc import LevelRepositoryABC | ||||
| from bot_data.abc.server_repository_abc import ServerRepositoryABC | ||||
| from bot_data.abc.user_repository_abc import UserRepositoryABC | ||||
| from bot_data.abc.user_warnings_repository_abc import UserWarningsRepositoryABC | ||||
| from bot_data.model.user import User | ||||
| from bot_data.model.user_warnings import UserWarnings | ||||
| from modules.base.abc.base_helper_abc import BaseHelperABC | ||||
| from modules.base.configuration.base_server_settings import BaseServerSettings | ||||
| from modules.level.service.level_service import LevelService | ||||
| from modules.permission.abc.permission_service_abc import PermissionServiceABC | ||||
|  | ||||
|  | ||||
| class UserWarningsService: | ||||
|     def __init__( | ||||
|         self, | ||||
|         logger: LoggerABC, | ||||
|         db: DatabaseContextABC, | ||||
|         bot: DiscordBotServiceABC, | ||||
|         warnings: UserWarningsRepositoryABC, | ||||
|         users: UserRepositoryABC, | ||||
|         servers: ServerRepositoryABC, | ||||
|         levels: LevelRepositoryABC, | ||||
|         level_service: LevelService, | ||||
|         message_service: MessageServiceABC, | ||||
|         t: TranslatePipe, | ||||
|         permissions: PermissionServiceABC, | ||||
|         base_helper: BaseHelperABC, | ||||
|     ): | ||||
|         self._logger = logger | ||||
|         self._db = db | ||||
|         self._bot = bot | ||||
|         self._warnings = warnings | ||||
|         self._users = users | ||||
|         self._servers = servers | ||||
|         self._levels = levels | ||||
|         self._level_service = level_service | ||||
|         self._message_service = message_service | ||||
|         self._t = t | ||||
|         self._permissions = permissions | ||||
|         self._base_helper = base_helper | ||||
|  | ||||
|     async def notify_team(self, member: discord.Member, description: str, removed=False): | ||||
|         try: | ||||
|             settings: BaseServerSettings = self._base_helper.get_config(member.guild.id) | ||||
|             channel = member.guild.get_channel(settings.team_channel_id) | ||||
|             if removed: | ||||
|                 translation = self._t.transform("modules.base.warnings.team_removed").format( | ||||
|                     description, member.mention | ||||
|                 ) | ||||
|             else: | ||||
|                 translation = self._t.transform("modules.base.warnings.team_warned").format(member.mention, description) | ||||
|  | ||||
|             self._bot.loop.create_task(self._message_service.send_channel_message(channel, translation)) | ||||
|         except Exception as e: | ||||
|             self._logger.error(__name__, f"Team notification for user warning failed!", e) | ||||
|  | ||||
|     async def notify_user(self, member: discord.Member, message: str): | ||||
|         try: | ||||
|             # run as task to keep the interaction alive | ||||
|             self._bot.loop.create_task(self._message_service.send_dm_message(message, member)) | ||||
|         except Exception as e: | ||||
|             self._logger.error(__name__, f"User notification for user warning failed!", e) | ||||
|  | ||||
|     async def check_for_warnings(self, member: discord.Member, user: User): | ||||
|         existing_warnings = self._warnings.get_user_warnings_by_user_id(user.id) | ||||
|  | ||||
|         if existing_warnings.count() == 1: | ||||
|             await self.notify_user(member, self._t.transform("modules.base.warnings.first")) | ||||
|  | ||||
|         elif existing_warnings.count() == 2: | ||||
|             server = self._servers.get_server_by_discord_id(member.guild.id) | ||||
|             user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id) | ||||
|             level = self._level_service.get_level(user) | ||||
|             levels = self._levels.get_levels_by_server_id(server.id).order_by(lambda l: l.min_xp) | ||||
|  | ||||
|             new_level = levels.where(lambda l: l.min_xp < level.min_xp).last_or_default() | ||||
|             if new_level is not None: | ||||
|                 user.xp = new_level.min_xp | ||||
|                 self._users.update_user(user) | ||||
|                 self._db.save_changes() | ||||
|             await self.notify_user(member, self._t.transform("modules.base.warnings.second")) | ||||
|  | ||||
|         elif existing_warnings.count() == 3: | ||||
|             server = self._servers.get_server_by_discord_id(member.guild.id) | ||||
|             user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id) | ||||
|             levels = self._levels.get_levels_by_server_id(server.id) | ||||
|  | ||||
|             new_level = levels.where(lambda l: l.min_xp > 0).order_by(lambda l: l.min_xp).last_or_default() | ||||
|             if new_level is not None: | ||||
|                 user.xp = new_level.min_xp | ||||
|                 self._users.update_user(user) | ||||
|                 self._db.save_changes() | ||||
|             await self.notify_user(member, self._t.transform("modules.base.warnings.third")) | ||||
|  | ||||
|         elif existing_warnings.count() >= 4: | ||||
|             user.xp = 0 | ||||
|             self._users.update_user(user) | ||||
|             self._db.save_changes() | ||||
|             await self.notify_team(member, self._t.transform("modules.base.warnings.kick").format(member.mention)) | ||||
|             await member.kick() | ||||
|  | ||||
|     async def add_warnings(self, member: discord.Member, description: str, author_id: int = None): | ||||
|         server = self._servers.get_server_by_discord_id(member.guild.id) | ||||
|         user = self._users.get_user_by_discord_id_and_server_id(member.id, server.id) | ||||
|  | ||||
|         author = None | ||||
|         if author_id is not None: | ||||
|             author = self._users.get_user_by_discord_id_and_server_id(author_id, server.id) | ||||
|  | ||||
|         warning = UserWarnings(description, user, author) | ||||
|         self._warnings.add_user_warnings(warning) | ||||
|         self._db.save_changes() | ||||
|         await self.notify_user(member, self._t.transform("modules.base.warnings.warned").format(warning.description)) | ||||
|         await self.notify_team(member, warning.description) | ||||
|         await self.check_for_warnings(member, user) | ||||
|  | ||||
|     async def remove_warnings(self, id: int): | ||||
|         warning = self._warnings.get_user_warnings_by_id(id) | ||||
|         self._warnings.delete_user_warnings(warning) | ||||
|         self._db.save_changes() | ||||
|  | ||||
|         guild = self._bot.get_guild(warning.user.server.discord_id) | ||||
|         member = guild.get_member(warning.user.discord_id) | ||||
|  | ||||
|         await self.notify_user(member, self._t.transform("modules.base.warnings.removed").format(warning.description)) | ||||
|         await self.notify_team(member, warning.description, removed=True) | ||||
		Reference in New Issue
	
	Block a user