Improved permission service #393
This commit is contained in:
		@@ -24,3 +24,4 @@ class FeatureFlagsEnum(Enum):
 | 
			
		||||
    game_server = "GameServer"
 | 
			
		||||
    sync_xp = "SyncXp"
 | 
			
		||||
    short_role_name = "ShortRoleName"
 | 
			
		||||
    technician_full_access = "TechnicianFullAccess"
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,7 @@ class FeatureFlagsSettings(ConfigurationModelABC):
 | 
			
		||||
        FeatureFlagsEnum.game_server.value: False,  # 25.09.2023 #366
 | 
			
		||||
        FeatureFlagsEnum.sync_xp.value: False,  # 25.09.2023 #366
 | 
			
		||||
        FeatureFlagsEnum.short_role_name.value: False,  # 28.09.2023 #378
 | 
			
		||||
        FeatureFlagsEnum.technician_full_access.value: False,  # 28.09.2023 #393
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    def __init__(self, **kwargs: dict):
 | 
			
		||||
 
 | 
			
		||||
@@ -8,42 +8,6 @@ class PermissionServiceABC(ABC):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def on_ready(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def on_member_update(self, before: discord.Member, after: discord.Member):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_admin_role_ids(self, g_id: int) -> list[int]:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_admin_roles(self, g_id: int) -> list[discord.Role]:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_admins(self, g_id: int) -> list[discord.Member]:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_moderator_role_ids(self, g_id: int) -> list[int]:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_moderator_roles(self, g_id: int) -> list[discord.Role]:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_moderators(self, g_id: int) -> list[discord.Member]:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_technicians(self) -> list[discord.Member]:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def is_member_admin(self, member: discord.Member) -> bool:
 | 
			
		||||
        pass
 | 
			
		||||
 
 | 
			
		||||
@@ -1,26 +0,0 @@
 | 
			
		||||
# -*- coding: utf-8 -*-
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
bot sh-edraft.de Discord bot
 | 
			
		||||
~~~~~~~~~~~~~~~~~~~
 | 
			
		||||
 | 
			
		||||
Discord bot for customers of sh-edraft.de
 | 
			
		||||
 | 
			
		||||
:copyright: (c) 2022 - 2023 sh-edraft.de
 | 
			
		||||
:license: MIT, see LICENSE for more details.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
__title__ = "modules.permission.events"
 | 
			
		||||
__author__ = "Sven Heidemann"
 | 
			
		||||
__license__ = "MIT"
 | 
			
		||||
__copyright__ = "Copyright (c) 2022 - 2023 sh-edraft.de"
 | 
			
		||||
__version__ = "1.1.9"
 | 
			
		||||
 | 
			
		||||
from collections import namedtuple
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# imports:
 | 
			
		||||
 | 
			
		||||
VersionInfo = namedtuple("VersionInfo", "major minor micro")
 | 
			
		||||
version_info = VersionInfo(major="1", minor="1", micro="9")
 | 
			
		||||
@@ -1,30 +0,0 @@
 | 
			
		||||
import discord
 | 
			
		||||
from cpl_core.configuration import ConfigurationABC
 | 
			
		||||
from cpl_core.logging import LoggerABC
 | 
			
		||||
from cpl_discord.events import OnMemberUpdateABC
 | 
			
		||||
 | 
			
		||||
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
 | 
			
		||||
from bot_core.configuration.feature_flags_settings import FeatureFlagsSettings
 | 
			
		||||
from bot_data.model.server_config import ServerConfig
 | 
			
		||||
from modules.permission.abc.permission_service_abc import PermissionServiceABC
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PermissionOnMemberUpdateEvent(OnMemberUpdateABC):
 | 
			
		||||
    def __init__(self, config: ConfigurationABC, logger: LoggerABC, permission_service: PermissionServiceABC):
 | 
			
		||||
        OnMemberUpdateABC.__init__(self)
 | 
			
		||||
        self._config = config
 | 
			
		||||
        self._logger = logger
 | 
			
		||||
        self._permission_service = permission_service
 | 
			
		||||
 | 
			
		||||
    async def on_member_update(self, before: discord.Member, after: discord.Member):
 | 
			
		||||
        if before.guild is not None:
 | 
			
		||||
            server_config: ServerConfig = self._config.get_configuration(f"ServerConfig_{before.guild.id}")
 | 
			
		||||
            if not FeatureFlagsSettings.get_flag_from_dict(
 | 
			
		||||
                server_config.feature_flags, FeatureFlagsEnum.permission_module
 | 
			
		||||
            ):
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
        self._logger.debug(__name__, f"Module {type(self)} started")
 | 
			
		||||
 | 
			
		||||
        if before.roles != after.roles:
 | 
			
		||||
            self._permission_service.on_member_update(before, after)
 | 
			
		||||
@@ -1,22 +0,0 @@
 | 
			
		||||
from cpl_core.logging import LoggerABC
 | 
			
		||||
from cpl_discord.events import OnReadyABC
 | 
			
		||||
 | 
			
		||||
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
 | 
			
		||||
from bot_core.configuration.feature_flags_settings import FeatureFlagsSettings
 | 
			
		||||
from bot_data.model.technician_config import TechnicianConfig
 | 
			
		||||
from modules.permission.abc.permission_service_abc import PermissionServiceABC
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PermissionOnReadyEvent(OnReadyABC):
 | 
			
		||||
    def __init__(self, logger: LoggerABC, permission_service: PermissionServiceABC, tech_config: TechnicianConfig):
 | 
			
		||||
        OnReadyABC.__init__(self)
 | 
			
		||||
        self._logger = logger
 | 
			
		||||
        self._permission_service = permission_service
 | 
			
		||||
        self._tech_config = tech_config
 | 
			
		||||
 | 
			
		||||
    async def on_ready(self):
 | 
			
		||||
        if not FeatureFlagsSettings.get_flag_from_dict(
 | 
			
		||||
            self._tech_config.feature_flags, FeatureFlagsEnum.permission_module
 | 
			
		||||
        ):
 | 
			
		||||
            return
 | 
			
		||||
        self._permission_service.on_ready()
 | 
			
		||||
@@ -1,16 +1,11 @@
 | 
			
		||||
from cpl_core.configuration import ConfigurationABC
 | 
			
		||||
from cpl_core.dependency_injection import ServiceCollectionABC
 | 
			
		||||
from cpl_core.environment import ApplicationEnvironmentABC
 | 
			
		||||
from cpl_discord.discord_event_types_enum import DiscordEventTypesEnum
 | 
			
		||||
from cpl_discord.service.discord_collection_abc import DiscordCollectionABC
 | 
			
		||||
 | 
			
		||||
from bot_core.abc.module_abc import ModuleABC
 | 
			
		||||
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
 | 
			
		||||
from modules.permission.abc.permission_service_abc import PermissionServiceABC
 | 
			
		||||
from modules.permission.events.permission_on_member_update_event import (
 | 
			
		||||
    PermissionOnMemberUpdateEvent,
 | 
			
		||||
)
 | 
			
		||||
from modules.permission.events.permission_on_ready_event import PermissionOnReadyEvent
 | 
			
		||||
from modules.permission.service.permission_service import PermissionService
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -22,8 +17,6 @@ class PermissionModule(ModuleABC):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
 | 
			
		||||
        services.add_singleton(PermissionServiceABC, PermissionService)
 | 
			
		||||
        services.add_transient(PermissionServiceABC, PermissionService)
 | 
			
		||||
        # commands
 | 
			
		||||
        # events
 | 
			
		||||
        self._dc.add_event(DiscordEventTypesEnum.on_ready.value, PermissionOnReadyEvent)
 | 
			
		||||
        self._dc.add_event(DiscordEventTypesEnum.on_member_update.value, PermissionOnMemberUpdateEvent)
 | 
			
		||||
 
 | 
			
		||||
@@ -3,146 +3,70 @@ from cpl_core.configuration import ConfigurationABC
 | 
			
		||||
from cpl_core.logging import LoggerABC
 | 
			
		||||
from cpl_discord.service import DiscordBotServiceABC
 | 
			
		||||
 | 
			
		||||
from bot_data.model.server_config import ServerConfig
 | 
			
		||||
from bot_data.abc.server_config_repository_abc import ServerConfigRepositoryABC
 | 
			
		||||
from bot_data.abc.server_repository_abc import ServerRepositoryABC
 | 
			
		||||
from bot_data.abc.technician_config_repository_abc import TechnicianConfigRepositoryABC
 | 
			
		||||
from bot_data.model.team_member_type_enum import TeamMemberTypeEnum
 | 
			
		||||
from bot_data.model.technician_config import TechnicianConfig
 | 
			
		||||
from modules.permission.abc.permission_service_abc import PermissionServiceABC
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PermissionService(PermissionServiceABC):
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        technician_settings: TechnicianConfig,
 | 
			
		||||
        logger: LoggerABC,
 | 
			
		||||
        bot: DiscordBotServiceABC,
 | 
			
		||||
        config: ConfigurationABC,
 | 
			
		||||
        servers: ServerRepositoryABC,
 | 
			
		||||
        server_configs: ServerConfigRepositoryABC,
 | 
			
		||||
        technician_configs: TechnicianConfigRepositoryABC,
 | 
			
		||||
    ):
 | 
			
		||||
        PermissionServiceABC.__init__(self)
 | 
			
		||||
        self._logger = logger
 | 
			
		||||
        self._bot = bot
 | 
			
		||||
        self._config = config
 | 
			
		||||
        self._technician_settings = technician_settings
 | 
			
		||||
        self._servers = servers
 | 
			
		||||
        self._server_configs = server_configs
 | 
			
		||||
        self._technician_configs = technician_configs
 | 
			
		||||
 | 
			
		||||
        self._admin_role_ids: dict[int, list[int]] = {}
 | 
			
		||||
        self._admin_roles: dict[int, list[discord.Role]] = {}
 | 
			
		||||
        self._admins: dict[int, list[discord.Member]] = {}
 | 
			
		||||
    def _has_member_role(self, member: discord.Member, team_member_type: TeamMemberTypeEnum) -> bool:
 | 
			
		||||
        self._logger.debug(__name__, f"Checking permissions for {member.name}")
 | 
			
		||||
        if member is None or member.guild is None:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        self._moderator_role_ids: dict[int, list[int]] = {}
 | 
			
		||||
        self._moderator_roles: dict[int, list[discord.Role]] = {}
 | 
			
		||||
        self._moderators: dict[int, list[discord.Member]] = {}
 | 
			
		||||
 | 
			
		||||
        self._technician_ids: list[int] = technician_settings.technician_ids.to_list()
 | 
			
		||||
        self._technicians: list[discord.Member] = []
 | 
			
		||||
 | 
			
		||||
    def on_ready(self):
 | 
			
		||||
        for guild in self._bot.guilds:
 | 
			
		||||
            guild: discord.Guild = guild
 | 
			
		||||
            self._logger.debug(__name__, f"Validate permission settings")
 | 
			
		||||
 | 
			
		||||
            for technician_id in self._technician_ids:
 | 
			
		||||
                technician = guild.get_member(technician_id)
 | 
			
		||||
                if technician is None:
 | 
			
		||||
        try:
 | 
			
		||||
            server = self._servers.get_server_by_discord_id(member.guild.id)
 | 
			
		||||
            config = self._server_configs.get_server_config_by_server(server.id)
 | 
			
		||||
            roles = config.team_role_ids.where(lambda x: x.team_member_type == team_member_type).select(
 | 
			
		||||
                lambda x: member.guild.get_role(x.role_id)
 | 
			
		||||
            )
 | 
			
		||||
            for role in roles:
 | 
			
		||||
                if role not in member.roles:
 | 
			
		||||
                    continue
 | 
			
		||||
                self._technicians.append(technician)
 | 
			
		||||
 | 
			
		||||
            settings: ServerConfig = self._config.get_configuration(f"ServerConfig_{guild.id}")
 | 
			
		||||
            if settings is None:
 | 
			
		||||
                self._logger.error(__name__, "Server settings not found")
 | 
			
		||||
                return
 | 
			
		||||
                return True
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self._logger.error(__name__, "Permission check failed", e)
 | 
			
		||||
 | 
			
		||||
            self._admin_role_ids[guild.id] = (
 | 
			
		||||
                settings.team_role_ids.where(lambda x: x.team_member_type == TeamMemberTypeEnum.admin)
 | 
			
		||||
                .select(lambda x: x.role_id)
 | 
			
		||||
                .to_list()
 | 
			
		||||
            )
 | 
			
		||||
            self._moderator_role_ids[guild.id] = (
 | 
			
		||||
                settings.team_role_ids.where(lambda x: x.team_member_type == TeamMemberTypeEnum.moderator)
 | 
			
		||||
                .select(lambda x: x.role_id)
 | 
			
		||||
                .to_list()
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            admin_roles = []
 | 
			
		||||
            admins = []
 | 
			
		||||
 | 
			
		||||
            mod_roles = []
 | 
			
		||||
            mods = []
 | 
			
		||||
 | 
			
		||||
            for role in guild.roles:
 | 
			
		||||
                role: discord.Role = role
 | 
			
		||||
 | 
			
		||||
                if role.id in self._admin_role_ids[guild.id]:
 | 
			
		||||
                    admin_roles.append(role)
 | 
			
		||||
                    self._logger.trace(__name__, f"Added admin role {role}")
 | 
			
		||||
 | 
			
		||||
                    for member in role.members:
 | 
			
		||||
                        admins.append(member)
 | 
			
		||||
                        self._logger.trace(__name__, f"Added admin {member}")
 | 
			
		||||
 | 
			
		||||
                if role.id in self._moderator_role_ids[guild.id]:
 | 
			
		||||
                    mod_roles.append(role)
 | 
			
		||||
                    self._logger.trace(__name__, f"Added moderator role {role}")
 | 
			
		||||
 | 
			
		||||
                    for member in role.members:
 | 
			
		||||
                        mods.append(member)
 | 
			
		||||
                        self._logger.trace(__name__, f"Added moderator {member}")
 | 
			
		||||
 | 
			
		||||
            self._admin_roles[guild.id] = admin_roles
 | 
			
		||||
            self._admins[guild.id] = admins
 | 
			
		||||
            self._moderator_roles[guild.id] = mod_roles
 | 
			
		||||
            self._moderators[guild.id] = mods
 | 
			
		||||
 | 
			
		||||
    def on_member_update(self, before: discord.Member, after: discord.Member):
 | 
			
		||||
        g_id = after.guild.id
 | 
			
		||||
 | 
			
		||||
        for admin_role in self._admin_roles[g_id]:
 | 
			
		||||
            if admin_role in before.roles and admin_role not in after.roles:
 | 
			
		||||
                self._admins[g_id].remove(after)
 | 
			
		||||
                self._logger.trace(__name__, f"Removed {after.id} from admins")
 | 
			
		||||
 | 
			
		||||
            elif admin_role in after.roles and admin_role not in before.roles:
 | 
			
		||||
                self._admins[g_id].append(after)
 | 
			
		||||
                self._logger.trace(__name__, f"Added {after.id} to admins")
 | 
			
		||||
 | 
			
		||||
        for moderator_role in self._moderator_roles[g_id]:
 | 
			
		||||
            if moderator_role in before.roles and moderator_role not in after.roles:
 | 
			
		||||
                self._moderators[g_id].remove(after)
 | 
			
		||||
                self._logger.trace(__name__, f"Removed {after.id} from moderators")
 | 
			
		||||
 | 
			
		||||
            elif moderator_role in after.roles and moderator_role not in before.roles:
 | 
			
		||||
                self._moderators[g_id].append(after)
 | 
			
		||||
                self._logger.trace(__name__, f"Added {after.id} to moderators")
 | 
			
		||||
 | 
			
		||||
    def get_admin_role_ids(self, g_id: int) -> list[int]:
 | 
			
		||||
        return self._admin_role_ids[g_id]
 | 
			
		||||
 | 
			
		||||
    def get_admin_roles(self, g_id: int) -> list[discord.Role]:
 | 
			
		||||
        return self._admin_roles[g_id]
 | 
			
		||||
 | 
			
		||||
    def get_admins(self, g_id: int) -> list[discord.Member]:
 | 
			
		||||
        return self._admins[g_id]
 | 
			
		||||
 | 
			
		||||
    def get_moderator_role_ids(self, g_id: int) -> list[int]:
 | 
			
		||||
        return self._moderator_role_ids[g_id]
 | 
			
		||||
 | 
			
		||||
    def get_moderator_roles(self, g_id: int) -> list[discord.Role]:
 | 
			
		||||
        return self._moderator_roles[g_id]
 | 
			
		||||
 | 
			
		||||
    def get_moderators(self, g_id: int) -> list[discord.Member]:
 | 
			
		||||
        return self._moderators[g_id]
 | 
			
		||||
 | 
			
		||||
    def get_technicians(self) -> list[discord.Member]:
 | 
			
		||||
        return self._technicians
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    def is_member_admin(self, member: discord.Member) -> bool:
 | 
			
		||||
        return member is not None and member.guild.id in self._admins and member in self._admins[member.guild.id]
 | 
			
		||||
        return self._has_member_role(member, TeamMemberTypeEnum.admin)
 | 
			
		||||
 | 
			
		||||
    def is_member_moderator(self, member: discord.Member) -> bool:
 | 
			
		||||
        return (
 | 
			
		||||
            member is not None
 | 
			
		||||
            and member.guild.id in self._moderators
 | 
			
		||||
            and member in self._moderators[member.guild.id]
 | 
			
		||||
            or self.is_member_admin(member)
 | 
			
		||||
        return self._has_member_role(member, TeamMemberTypeEnum.moderator) or self._has_member_role(
 | 
			
		||||
            member, TeamMemberTypeEnum.admin
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def is_member_technician(self, member: discord.Member) -> bool:
 | 
			
		||||
        return member is not None and member in self._technicians
 | 
			
		||||
        self._logger.debug(__name__, f"Checking is member {member.name} technician")
 | 
			
		||||
        if member is None or member.guild is None:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            tech_config = self._technician_configs.get_technician_config()
 | 
			
		||||
            if member.id in tech_config.technician_ids:
 | 
			
		||||
                return True
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self._logger.error(__name__, "Permission check failed", e)
 | 
			
		||||
 | 
			
		||||
        return False
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user