0.3.1 #187

Merged
edraft merged 46 commits from 0.3.1 into master 2023-01-14 10:39:27 +01:00
252 changed files with 6085 additions and 3791 deletions
Showing only changes of commit e2b2fb5abb - Show all commits

View File

@ -25,8 +25,8 @@
"gv": "cpl get-version",
"get-version": "export VERSION=$(cpl run get-version --dev); echo $VERSION;",
"pre-build": "cpl set-version $ARGS",
"post-build": "cpl run post-build --dev",
"pre-build": "cpl set-version $ARGS; black src;",
"post-build": "cpl run post-build --dev; black src;",
"pre-prod": "cpl build",
"prod": "export KDB_ENVIRONMENT=production; export KDB_NAME=KDB-Prod; cpl start;",

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -13,7 +13,6 @@ from bot_core.configuration.feature_flags_settings import FeatureFlagsSettings
class Application(DiscordBotApplicationABC):
def __init__(self, config: ConfigurationABC, services: ServiceProviderABC):
DiscordBotApplicationABC.__init__(self, config, services)
@ -24,12 +23,18 @@ class Application(DiscordBotApplicationABC):
self._logger: LoggerABC = services.get_service(LoggerABC)
# cpl-discord
self._bot: DiscordBotServiceABC = services.get_service(DiscordBotServiceABC)
self._bot_settings: DiscordBotSettings = config.get_configuration(DiscordBotSettings)
self._bot_settings: DiscordBotSettings = config.get_configuration(
DiscordBotSettings
)
# cpl-translation
self._translation: TranslationServiceABC = services.get_service(TranslationServiceABC)
self._translation: TranslationServiceABC = services.get_service(
TranslationServiceABC
)
self._t: TranslatePipe = services.get_service(TranslatePipe)
self._feature_flags: FeatureFlagsSettings = config.get_configuration(FeatureFlagsSettings)
self._feature_flags: FeatureFlagsSettings = config.get_configuration(
FeatureFlagsSettings
)
# api
if self._feature_flags.get_flag(FeatureFlagsEnum.api_module):
@ -38,24 +43,28 @@ class Application(DiscordBotApplicationABC):
self._is_stopping = False
async def configure(self):
self._translation.load_by_settings(self._configuration.get_configuration(TranslationSettings))
self._translation.load_by_settings(
self._configuration.get_configuration(TranslationSettings)
)
async def main(self):
try:
self._logger.debug(__name__, f'Starting...')
self._logger.debug(__name__, f"Starting...")
if self._feature_flags.get_flag(FeatureFlagsEnum.api_module) and \
self._feature_flags.get_flag(FeatureFlagsEnum.api_only) and \
self._environment.environment_name == 'development':
if (
self._feature_flags.get_flag(FeatureFlagsEnum.api_module)
and self._feature_flags.get_flag(FeatureFlagsEnum.api_only)
and self._environment.environment_name == "development"
):
self._api.start()
self._api.join()
return
self._logger.trace(__name__, f'Try to start {DiscordBotService.__name__}')
self._logger.trace(__name__, f"Try to start {DiscordBotService.__name__}")
await self._bot.start_async()
await self._bot.stop_async()
except Exception as e:
self._logger.error(__name__, 'Start failed', e)
self._logger.error(__name__, "Start failed", e)
async def stop_async(self):
if self._is_stopping:
@ -63,13 +72,17 @@ class Application(DiscordBotApplicationABC):
self._is_stopping = True
try:
self._logger.trace(__name__, f'Try to stop {DiscordBotService.__name__}')
self._logger.trace(__name__, f"Try to stop {DiscordBotService.__name__}")
await self._bot.close()
self._logger.trace(__name__, f'Stopped {DiscordBotService.__name__}')
self._logger.trace(__name__, f"Stopped {DiscordBotService.__name__}")
except Exception as e:
self._logger.error(__name__, 'stop failed', e)
self._logger.error(__name__, "stop failed", e)
Console.write_line()
def is_restart(self):
return True if self._configuration.get_configuration('IS_RESTART') == 'true' else False #
return (
True
if self._configuration.get_configuration("IS_RESTART") == "true"
else False
) #

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = "bot.extension"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -7,7 +7,6 @@ from bot_core.configuration.bot_settings import BotSettings
class InitBotExtension(ApplicationExtensionABC):
def __init__(self):
ApplicationExtensionABC.__init__(self)
@ -15,6 +14,5 @@ class InitBotExtension(ApplicationExtensionABC):
settings = config.get_configuration(BotSettings)
bot: DiscordBotServiceABC = services.get_service(
DiscordBotServiceABC,
max_messages=settings.cache_max_messages
DiscordBotServiceABC, max_messages=settings.cache_max_messages
)

View File

@ -19,23 +19,24 @@ from modules.database.database_extension import DatabaseExtension
class Program:
def __init__(self):
self.app: Optional[Application] = None
async def start(self):
# discord extension has to be loaded before modules (modules depends on discord stuff)
app_builder = ApplicationBuilder(Application) \
.use_extension(StartupSettingsExtension) \
.use_extension(StartupDiscordExtension) \
.use_extension(StartupModuleExtension) \
.use_extension(StartupMigrationExtension) \
.use_extension(InitBotExtension) \
.use_extension(BootLogExtension) \
.use_extension(DatabaseExtension) \
.use_extension(AppApiExtension) \
.use_extension(CoreExtension) \
app_builder = (
ApplicationBuilder(Application)
.use_extension(StartupSettingsExtension)
.use_extension(StartupDiscordExtension)
.use_extension(StartupModuleExtension)
.use_extension(StartupMigrationExtension)
.use_extension(InitBotExtension)
.use_extension(BootLogExtension)
.use_extension(DatabaseExtension)
.use_extension(AppApiExtension)
.use_extension(CoreExtension)
.use_startup(Startup)
)
self.app: Application = await app_builder.build_async()
await self.app.run_async()
@ -52,19 +53,25 @@ def main():
except KeyboardInterrupt:
asyncio.run(program.stop())
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Cannot start the bot', f'{e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Cannot start the bot",
f"{e} -> {traceback.format_exc()}",
)
finally:
try:
asyncio.run(program.stop())
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Cannot stop the bot', f'{e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Cannot stop the bot",
f"{e} -> {traceback.format_exc()}",
)
if program.app is not None and program.app.is_restart():
del program
main()
if __name__ == '__main__':
if __name__ == "__main__":
main()
# ((

View File

@ -15,22 +15,24 @@ from modules.technician.technician_module import TechnicianModule
class ModuleList:
@staticmethod
def get_modules():
# core modules (modules out of modules folder) should be loaded first!
return List(type, [
CoreModule, # has to be first!
DataModule,
PermissionModule,
DatabaseModule,
AutoRoleModule,
BaseModule,
LevelModule,
ApiModule,
StatsModule,
TechnicianModule,
# has to be last!
BootLogModule,
CoreExtensionModule,
])
return List(
type,
[
CoreModule, # has to be first!
DataModule,
PermissionModule,
DatabaseModule,
AutoRoleModule,
BaseModule,
LevelModule,
ApiModule,
StatsModule,
TechnicianModule,
# has to be last!
BootLogModule,
CoreExtensionModule,
],
)

View File

@ -20,7 +20,6 @@ from bot_data.db_context import DBContext
class Startup(StartupABC):
def __init__(self):
StartupABC.__init__(self)
self._start_time = datetime.now()
@ -28,12 +27,16 @@ class Startup(StartupABC):
self._config: Optional[ConfigurationABC] = None
self._feature_flags: Optional[FeatureFlagsSettings] = None
def configure_configuration(self, configuration: ConfigurationABC, environment: ApplicationEnvironment) -> ConfigurationABC:
def configure_configuration(
self, configuration: ConfigurationABC, environment: ApplicationEnvironment
) -> ConfigurationABC:
self._config = configuration
self._feature_flags = configuration.get_configuration(FeatureFlagsSettings)
return configuration
def configure_services(self, services: ServiceCollectionABC, environment: ApplicationEnvironment) -> ServiceProviderABC:
def configure_services(
self, services: ServiceCollectionABC, environment: ApplicationEnvironment
) -> ServiceProviderABC:
services.add_logging()
if self._feature_flags.get_flag(FeatureFlagsEnum.core_module):
# custom logging
@ -45,16 +48,20 @@ class Startup(StartupABC):
services.add_singleton(CustomFileLoggerABC, ApiLogger)
services.add_translation()
services.add_db_context(DBContext, self._config.get_configuration(DatabaseSettings))
services.add_db_context(
DBContext, self._config.get_configuration(DatabaseSettings)
)
provider = services.build_service_provider()
# instantiate custom logger
for c in CustomFileLoggerABC.__subclasses__():
i: LoggerABC = provider.get_service(c)
logger: LoggerABC = provider.get_service(LoggerABC)
for flag in [f for f in FeatureFlagsEnum]:
logger.debug(__name__, f'Loaded feature-flag: {flag} = {self._feature_flags.get_flag(flag)}')
logger.debug(
__name__,
f"Loaded feature-flag: {flag} = {self._feature_flags.get_flag(flag)}",
)
return provider

View File

@ -6,13 +6,16 @@ from cpl_discord import get_discord_collection
class StartupDiscordExtension(StartupExtensionABC):
def __init__(self):
pass
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
def configure_configuration(
self, config: ConfigurationABC, env: ApplicationEnvironmentABC
):
pass
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
services.add_discord()
dcc = get_discord_collection(services)

View File

@ -10,24 +10,35 @@ from bot_data.migration.auto_role_migration import AutoRoleMigration
from bot_data.migration.initial_migration import InitialMigration
from bot_data.migration.level_migration import LevelMigration
from bot_data.migration.stats_migration import StatsMigration
from bot_data.migration.user_message_count_per_hour_migration import UserMessageCountPerHourMigration
from bot_data.migration.user_message_count_per_hour_migration import (
UserMessageCountPerHourMigration,
)
from bot_data.service.migration_service import MigrationService
class StartupMigrationExtension(StartupExtensionABC):
def __init__(self):
pass
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
def configure_configuration(
self, config: ConfigurationABC, env: ApplicationEnvironmentABC
):
pass
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
services.add_transient(MigrationService)
services.add_transient(MigrationABC, InitialMigration)
services.add_transient(MigrationABC, AutoRoleMigration) # 03.10.2022 #54 - 0.2.2
services.add_transient(
MigrationABC, AutoRoleMigration
) # 03.10.2022 #54 - 0.2.2
services.add_transient(MigrationABC, ApiMigration) # 15.10.2022 #70 - 0.3.0
services.add_transient(MigrationABC, LevelMigration) # 06.11.2022 #25 - 0.3.0
services.add_transient(MigrationABC, StatsMigration) # 09.11.2022 #46 - 0.3.0
services.add_transient(MigrationABC, AutoRoleFix1Migration) # 30.12.2022 #151 - 0.3.0
services.add_transient(MigrationABC, UserMessageCountPerHourMigration) # 11.01.2023 #168 - 0.3.1
services.add_transient(
MigrationABC, AutoRoleFix1Migration
) # 30.12.2022 #151 - 0.3.0
services.add_transient(
MigrationABC, UserMessageCountPerHourMigration
) # 11.01.2023 #168 - 0.3.1

View File

@ -12,18 +12,21 @@ from bot_core.configuration.feature_flags_settings import FeatureFlagsSettings
class StartupModuleExtension(StartupExtensionABC):
def __init__(self):
self._config: Optional[ConfigurationABC] = None
self._feature_flags: Optional[FeatureFlagsSettings] = None
self._modules = ModuleList.get_modules()
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
def configure_configuration(
self, config: ConfigurationABC, env: ApplicationEnvironmentABC
):
self._config = config
self._feature_flags = config.get_configuration(FeatureFlagsSettings)
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
provider = services.build_service_provider()
dc_collection: DiscordCollectionABC = provider.get_service(DiscordCollectionABC)
@ -33,7 +36,7 @@ class StartupModuleExtension(StartupExtensionABC):
continue
Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f'[{__name__}] Loaded module: {module_type}')
Console.write_line(f"[{__name__}] Loaded module: {module_type}")
Console.color_reset()
module.configure_configuration(self._config, env)
module.configure_services(services, env)

View File

@ -16,40 +16,67 @@ from modules.permission.configuration.permission_settings import PermissionSetti
class StartupSettingsExtension(StartupExtensionABC):
def __init__(self):
self._start_time = datetime.now()
def configure_configuration(self, configuration: ConfigurationABC, environment: ApplicationEnvironmentABC):
def configure_configuration(
self, configuration: ConfigurationABC, environment: ApplicationEnvironmentABC
):
# this shit has to be done here because we need settings in subsequent startup extensions
environment.set_working_directory(os.path.dirname(os.path.realpath(__file__)))
configuration.add_environment_variables('KDB_')
configuration.add_environment_variables('DISCORD_')
configuration.add_environment_variables("KDB_")
configuration.add_environment_variables("DISCORD_")
configuration.add_json_file(f'config/appsettings.json', optional=False)
configuration.add_json_file(f'config/appsettings.{environment.environment_name}.json', optional=True)
configuration.add_json_file(f'config/appsettings.{environment.host_name}.json', optional=True)
configuration.add_json_file(f"config/appsettings.json", optional=False)
configuration.add_json_file(
f"config/appsettings.{environment.environment_name}.json", optional=True
)
configuration.add_json_file(
f"config/appsettings.{environment.host_name}.json", optional=True
)
# load feature-flags
configuration.add_json_file(f'config/feature-flags.json', optional=False)
configuration.add_json_file(f'config/feature-flags.{environment.environment_name}.json', optional=True)
configuration.add_json_file(f'config/feature-flags.{environment.host_name}.json', optional=True)
configuration.add_json_file(f"config/feature-flags.json", optional=False)
configuration.add_json_file(
f"config/feature-flags.{environment.environment_name}.json", optional=True
)
configuration.add_json_file(
f"config/feature-flags.{environment.host_name}.json", optional=True
)
configuration.add_configuration('Startup_StartTime', str(self._start_time))
self._configure_settings_with_sub_settings(configuration, BotSettings, lambda x: x.servers, lambda x: x.id)
self._configure_settings_with_sub_settings(configuration, BaseSettings, lambda x: x.servers, lambda x: x.id)
self._configure_settings_with_sub_settings(configuration, BootLogSettings, lambda x: x.servers, lambda x: x.id)
self._configure_settings_with_sub_settings(configuration, LevelSettings, lambda x: x.servers, lambda x: x.id)
self._configure_settings_with_sub_settings(configuration, PermissionSettings, lambda x: x.servers, lambda x: x.id)
self._configure_settings_with_sub_settings(configuration, BotLoggingSettings, lambda x: x.files, lambda x: x.key)
configuration.add_configuration("Startup_StartTime", str(self._start_time))
self._configure_settings_with_sub_settings(
configuration, BotSettings, lambda x: x.servers, lambda x: x.id
)
self._configure_settings_with_sub_settings(
configuration, BaseSettings, lambda x: x.servers, lambda x: x.id
)
self._configure_settings_with_sub_settings(
configuration, BootLogSettings, lambda x: x.servers, lambda x: x.id
)
self._configure_settings_with_sub_settings(
configuration, LevelSettings, lambda x: x.servers, lambda x: x.id
)
self._configure_settings_with_sub_settings(
configuration, PermissionSettings, lambda x: x.servers, lambda x: x.id
)
self._configure_settings_with_sub_settings(
configuration, BotLoggingSettings, lambda x: x.files, lambda x: x.key
)
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
pass
@staticmethod
def _configure_settings_with_sub_settings(config: ConfigurationABC, settings: Type, list_atr: Callable, atr: Callable):
def _configure_settings_with_sub_settings(
config: ConfigurationABC, settings: Type, list_atr: Callable, atr: Callable
):
settings: Optional[settings] = config.get_configuration(settings)
if settings is None:
return
for sub_settings in list_atr(settings):
config.add_configuration(f'{type(sub_settings).__name__}_{atr(sub_settings)}', sub_settings)
config.add_configuration(
f"{type(sub_settings).__name__}_{atr(sub_settings)}", sub_settings
)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.abc'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.abc"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -15,78 +15,108 @@ from bot_data.model.auth_user import AuthUser
class AuthServiceABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def __init__(self): pass
def generate_token(self, user: AuthUser) -> str:
pass
@abstractmethod
def generate_token(self, user: AuthUser) -> str: pass
def decode_token(self, token: str) -> dict:
pass
@abstractmethod
def decode_token(self, token: str) -> dict: pass
def get_decoded_token_from_request(self) -> dict:
pass
@abstractmethod
def get_decoded_token_from_request(self) -> dict: pass
def find_decoded_token_from_request(self) -> Optional[dict]:
pass
@abstractmethod
def find_decoded_token_from_request(self) -> Optional[dict]: pass
async def get_all_auth_users_async(self) -> List[AuthUserDTO]:
pass
@abstractmethod
async def get_all_auth_users_async(self) -> List[AuthUserDTO]: pass
async def get_filtered_auth_users_async(
self, criteria: AuthUserSelectCriteria
) -> AuthUserFilteredResultDTO:
pass
@abstractmethod
async def get_filtered_auth_users_async(self, criteria: AuthUserSelectCriteria) -> AuthUserFilteredResultDTO: pass
async def get_auth_user_by_email_async(
self, email: str, with_password: bool = False
) -> AuthUserDTO:
pass
@abstractmethod
async def get_auth_user_by_email_async(self, email: str, with_password: bool = False) -> AuthUserDTO: pass
async def find_auth_user_by_email_async(self, email: str) -> AuthUserDTO:
pass
@abstractmethod
async def find_auth_user_by_email_async(self, email: str) -> AuthUserDTO: pass
async def add_auth_user_async(self, user_dto: AuthUserDTO):
pass
@abstractmethod
async def add_auth_user_async(self, user_dto: AuthUserDTO): pass
async def add_auth_user_by_oauth_async(self, dto: OAuthDTO):
pass
@abstractmethod
async def add_auth_user_by_oauth_async(self, dto: OAuthDTO): pass
async def add_auth_user_by_discord_async(
self, user_dto: AuthUserDTO, dc_id: int
) -> OAuthDTO:
pass
@abstractmethod
async def add_auth_user_by_discord_async(self, user_dto: AuthUserDTO, dc_id: int) -> OAuthDTO: pass
async def update_user_async(self, update_user_dto: UpdateAuthUserDTO):
pass
@abstractmethod
async def update_user_async(self, update_user_dto: UpdateAuthUserDTO): pass
async def update_user_as_admin_async(self, update_user_dto: UpdateAuthUserDTO):
pass
@abstractmethod
async def update_user_as_admin_async(self, update_user_dto: UpdateAuthUserDTO): pass
async def delete_auth_user_by_email_async(self, email: str):
pass
@abstractmethod
async def delete_auth_user_by_email_async(self, email: str): pass
async def delete_auth_user_async(self, user_dto: AuthUserDTO):
pass
@abstractmethod
async def delete_auth_user_async(self, user_dto: AuthUserDTO): pass
async def verify_login(self, token_str: str) -> bool:
pass
@abstractmethod
async def verify_login(self, token_str: str) -> bool: pass
async def login_async(self, user_dto: AuthUserDTO) -> TokenDTO:
pass
@abstractmethod
async def login_async(self, user_dto: AuthUserDTO) -> TokenDTO: pass
async def login_discord_async(self, oauth_dto: AuthUserDTO) -> TokenDTO:
pass
@abstractmethod
async def login_discord_async(self, oauth_dto: AuthUserDTO) -> TokenDTO: pass
async def refresh_async(self, token_dto: TokenDTO) -> TokenDTO:
pass
@abstractmethod
async def refresh_async(self, token_dto: TokenDTO) -> TokenDTO: pass
async def revoke_async(self, token_dto: TokenDTO):
pass
@abstractmethod
async def revoke_async(self, token_dto: TokenDTO): pass
async def confirm_email_async(self, id: str) -> bool:
pass
@abstractmethod
async def confirm_email_async(self, id: str) -> bool: pass
async def forgot_password_async(self, email: str):
pass
@abstractmethod
async def forgot_password_async(self, email: str): pass
async def confirm_forgot_password_async(self, id: str) -> EMailStringDTO:
pass
@abstractmethod
async def confirm_forgot_password_async(self, id: str) -> EMailStringDTO: pass
@abstractmethod
async def reset_password_async(self, rp_dto: ResetPasswordDTO): pass
async def reset_password_async(self, rp_dto: ResetPasswordDTO):
pass

View File

@ -2,12 +2,14 @@ from abc import ABC, abstractmethod
class DtoABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def __init__(self): pass
def from_dict(self, values: dict):
pass
@abstractmethod
def from_dict(self, values: dict): pass
@abstractmethod
def to_dict(self) -> dict: pass
def to_dict(self) -> dict:
pass

View File

@ -2,14 +2,9 @@ from abc import ABC, abstractmethod
class SelectCriteriaABC(ABC):
@abstractmethod
def __init__(
self,
page_index: int,
page_size: int,
sort_direction: str,
sort_column: str
self, page_index: int, page_size: int, sort_direction: str, sort_column: str
):
self.page_index = page_index
self.page_size = page_size

View File

@ -6,11 +6,12 @@ from bot_api.abc.dto_abc import DtoABC
class TransformerABC:
@staticmethod
@abstractmethod
def to_db(dto: DtoABC) -> TableABC:
pass
@staticmethod
@abstractmethod
def to_db(dto: DtoABC) -> TableABC: pass
@staticmethod
@abstractmethod
def to_dto(db: TableABC) -> DtoABC: pass
def to_dto(db: TableABC) -> DtoABC:
pass

View File

@ -25,18 +25,18 @@ from bot_api.route.route import Route
class Api(Flask):
def __init__(
self,
logger: ApiLogger,
services: ServiceProviderABC,
api_settings: ApiSettings,
frontend_settings: FrontendSettings,
auth_settings: AuthenticationSettings,
*args, **kwargs
self,
logger: ApiLogger,
services: ServiceProviderABC,
api_settings: ApiSettings,
frontend_settings: FrontendSettings,
auth_settings: AuthenticationSettings,
*args,
**kwargs,
):
if not args:
kwargs.setdefault('import_name', __name__)
kwargs.setdefault("import_name", __name__)
Flask.__init__(self, *args, **kwargs)
@ -56,17 +56,21 @@ class Api(Flask):
self.register_error_handler(exc_class, self.handle_exception)
# websockets
self._socketio = SocketIO(self, cors_allowed_origins='*', path='/api/socket.io')
self._socketio.on_event('connect', self.on_connect)
self._socketio.on_event('disconnect', self.on_disconnect)
self._socketio = SocketIO(self, cors_allowed_origins="*", path="/api/socket.io")
self._socketio.on_event("connect", self.on_connect)
self._socketio.on_event("disconnect", self.on_disconnect)
self._requests = {}
@staticmethod
def _get_methods_from_registered_route() -> Union[list[str], str]:
methods = ['Unknown']
if request.path in Route.registered_routes and len(Route.registered_routes[request.path]) >= 1 and 'methods' in Route.registered_routes[request.path][1]:
methods = Route.registered_routes[request.path][1]['methods']
methods = ["Unknown"]
if (
request.path in Route.registered_routes
and len(Route.registered_routes[request.path]) >= 1
and "methods" in Route.registered_routes[request.path][1]
):
methods = Route.registered_routes[request.path][1]["methods"]
if len(methods) == 1:
return methods[0]
@ -77,7 +81,7 @@ class Api(Flask):
route = f[0]
kwargs = f[1]
cls = None
qual_name_split = route.__qualname__.split('.')
qual_name_split = route.__qualname__.split(".")
if len(qual_name_split) > 0:
cls_type = vars(sys.modules[route.__module__])[qual_name_split[0]]
cls = self._services.get_service(cls_type)
@ -87,7 +91,7 @@ class Api(Flask):
self.route(path, **kwargs)(partial_f)
def handle_exception(self, e: Exception):
self._logger.error(__name__, f'Caught error', e)
self._logger.error(__name__, f"Caught error", e)
if isinstance(e, ServiceException):
ex: ServiceException = e
@ -100,7 +104,7 @@ class Api(Flask):
return jsonify(error.to_dict()), 404
else:
tracking_id = uuid.uuid4()
user_message = f'Tracking Id: {tracking_id}'
user_message = f"Tracking Id: {tracking_id}"
self._logger.error(__name__, user_message, e)
error = ErrorDTO(None, user_message)
return jsonify(error.to_dict()), 400
@ -110,34 +114,44 @@ class Api(Flask):
self._requests[request] = request_id
method = request.access_control_request_method
self._logger.info(__name__, f'Received {request_id} @ {self._get_methods_from_registered_route() if method is None else method} {request.url} from {request.remote_addr}')
self._logger.info(
__name__,
f"Received {request_id} @ {self._get_methods_from_registered_route() if method is None else method} {request.url} from {request.remote_addr}",
)
headers = str(request.headers).replace('\n', '\n\t\t')
headers = str(request.headers).replace("\n", "\n\t\t")
data = request.get_data()
data = '' if len(data) == 0 else str(data.decode(encoding="utf-8"))
data = "" if len(data) == 0 else str(data.decode(encoding="utf-8"))
text = textwrap.dedent(f'Request: {request_id}:\n\tHeader:\n\t\t{headers}\n\tUser-Agent: {request.user_agent.string}\n\tBody: {data}')
text = textwrap.dedent(
f"Request: {request_id}:\n\tHeader:\n\t\t{headers}\n\tUser-Agent: {request.user_agent.string}\n\tBody: {data}"
)
self._logger.trace(__name__, text)
def after_request_hook(self, response: Response):
method = request.access_control_request_method
request_id = f'{self._get_methods_from_registered_route() if method is None else method} {request.url} from {request.remote_addr}'
request_id = f"{self._get_methods_from_registered_route() if method is None else method} {request.url} from {request.remote_addr}"
if request in self._requests:
request_id = self._requests[request]
self._logger.info(__name__, f'Answered {request_id}')
self._logger.info(__name__, f"Answered {request_id}")
headers = str(request.headers).replace('\n', '\n\t\t')
headers = str(request.headers).replace("\n", "\n\t\t")
data = request.get_data()
data = '' if len(data) == 0 else str(data.decode(encoding="utf-8"))
data = "" if len(data) == 0 else str(data.decode(encoding="utf-8"))
text = textwrap.dedent(f'Request: {request_id}:\n\tHeader:\n\t\t{headers}\n\tResponse: {data}')
text = textwrap.dedent(
f"Request: {request_id}:\n\tHeader:\n\t\t{headers}\n\tResponse: {data}"
)
self._logger.trace(__name__, text)
return response
def start(self):
self._logger.info(__name__, f'Starting API {self._api_settings.host}:{self._api_settings.port}')
self._logger.info(
__name__,
f"Starting API {self._api_settings.host}:{self._api_settings.port}",
)
self._register_routes()
self.secret_key = CredentialManager.decrypt(self._auth_settings.secret_key)
# from waitress import serve
@ -146,11 +160,11 @@ class Api(Flask):
wsgi.server(
eventlet.listen((self._api_settings.host, self._api_settings.port)),
self,
log_output=False
log_output=False,
)
def on_connect(self):
self._logger.info(__name__, f'Client connected')
self._logger.info(__name__, f"Client connected")
def on_disconnect(self):
self._logger.info(__name__, f'Client disconnected')
self._logger.info(__name__, f"Client disconnected")

View File

@ -23,19 +23,24 @@ from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
class ApiModule(ModuleABC):
def __init__(self, dc: DiscordCollectionABC):
ModuleABC.__init__(self, dc, FeatureFlagsEnum.api_module)
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
def configure_configuration(
self, config: ConfigurationABC, env: ApplicationEnvironmentABC
):
cwd = env.working_directory
env.set_working_directory(os.path.dirname(os.path.realpath(__file__)))
config.add_json_file(f'config/apisettings.json', optional=False)
config.add_json_file(f'config/apisettings.{env.environment_name}.json', optional=True)
config.add_json_file(f'config/apisettings.{env.host_name}.json', optional=True)
config.add_json_file(f"config/apisettings.json", optional=False)
config.add_json_file(
f"config/apisettings.{env.environment_name}.json", optional=True
)
config.add_json_file(f"config/apisettings.{env.host_name}.json", optional=True)
env.set_working_directory(cwd)
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
services.add_singleton(EMailClientABC, EMailClient)
services.add_singleton(ApiThread)

View File

@ -5,12 +5,7 @@ from bot_api.logging.api_logger import ApiLogger
class ApiThread(threading.Thread):
def __init__(
self,
logger: ApiLogger,
api: Api
):
def __init__(self, logger: ApiLogger, api: Api):
threading.Thread.__init__(self, daemon=True)
self._logger = logger
@ -18,7 +13,7 @@ class ApiThread(threading.Thread):
def run(self) -> None:
try:
self._logger.trace(__name__, f'Try to start {type(self._api).__name__}')
self._logger.trace(__name__, f"Try to start {type(self._api).__name__}")
self._api.start()
except Exception as e:
self._logger.error(__name__, 'Start failed', e)
self._logger.error(__name__, "Start failed", e)

View File

@ -11,16 +11,19 @@ from bot_data.abc.auth_user_repository_abc import AuthUserRepositoryABC
class AppApiExtension(ApplicationExtensionABC):
def __init__(self):
ApplicationExtensionABC.__init__(self)
async def run(self, config: ConfigurationABC, services: ServiceProviderABC):
feature_flags: FeatureFlagsSettings = config.get_configuration(FeatureFlagsSettings)
feature_flags: FeatureFlagsSettings = config.get_configuration(
FeatureFlagsSettings
)
if not feature_flags.get_flag(FeatureFlagsEnum.api_module):
return
auth_settings: AuthenticationSettings = config.get_configuration(AuthenticationSettings)
auth_settings: AuthenticationSettings = config.get_configuration(
AuthenticationSettings
)
auth_users: AuthUserRepositoryABC = services.get_service(AuthUserRepositoryABC)
auth: AuthServiceABC = services.get_service(AuthServiceABC)
Route.init_authorize(auth_users, auth)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.configuration'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.configuration"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -5,12 +5,11 @@ from cpl_core.console import Console
class ApiSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._port = 80
self._host = ''
self._host = ""
self._redirect_to_https = False
@property
@ -27,9 +26,13 @@ class ApiSettings(ConfigurationModelABC):
def from_dict(self, settings: dict):
try:
self._port = int(settings['Port'])
self._host = settings['Host']
self._redirect_to_https = bool(settings['RedirectToHTTPS'])
self._port = int(settings["Port"])
self._host = settings["Host"]
self._redirect_to_https = bool(settings["RedirectToHTTPS"])
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -6,13 +6,12 @@ from cpl_core.console import Console
class AuthenticationSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._secret_key = ''
self._issuer = ''
self._audience = ''
self._secret_key = ""
self._issuer = ""
self._audience = ""
self._token_expire_time = 0
self._refresh_token_expire_time = 0
@ -38,11 +37,15 @@ class AuthenticationSettings(ConfigurationModelABC):
def from_dict(self, settings: dict):
try:
self._secret_key = settings['SecretKey']
self._issuer = settings['Issuer']
self._audience = settings['Audience']
self._token_expire_time = int(settings['TokenExpireTime'])
self._refresh_token_expire_time = int(settings['RefreshTokenExpireTime'])
self._secret_key = settings["SecretKey"]
self._issuer = settings["Issuer"]
self._audience = settings["Audience"]
self._token_expire_time = int(settings["TokenExpireTime"])
self._refresh_token_expire_time = int(settings["RefreshTokenExpireTime"])
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -6,15 +6,14 @@ from cpl_query.extension import List
class DiscordAuthenticationSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._client_secret = ''
self._redirect_url = ''
self._client_secret = ""
self._redirect_url = ""
self._scope = List()
self._token_url = ''
self._auth_url = ''
self._token_url = ""
self._auth_url = ""
@property
def client_secret(self) -> str:
@ -38,11 +37,15 @@ class DiscordAuthenticationSettings(ConfigurationModelABC):
def from_dict(self, settings: dict):
try:
self._client_secret = settings['ClientSecret']
self._redirect_url = settings['RedirectURL']
self._scope = List(str, settings['Scope'])
self._token_url = settings['TokenURL']
self._auth_url = settings['AuthURL']
self._client_secret = settings["ClientSecret"]
self._redirect_url = settings["RedirectURL"]
self._scope = List(str, settings["Scope"])
self._token_url = settings["TokenURL"]
self._auth_url = settings["AuthURL"]
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -5,11 +5,10 @@ from cpl_core.console import Console
class FrontendSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._url = ''
self._url = ""
@property
def url(self) -> str:
@ -17,7 +16,11 @@ class FrontendSettings(ConfigurationModelABC):
def from_dict(self, settings: dict):
try:
self._url = settings['URL']
self._url = settings["URL"]
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -5,13 +5,7 @@ from cpl_cli.configuration.version_settings_name_enum import VersionSettingsName
class VersionSettings(ConfigurationModelABC):
def __init__(
self,
major: str = None,
minor: str = None,
micro: str = None
):
def __init__(self, major: str = None, minor: str = None, micro: str = None):
ConfigurationModelABC.__init__(self)
self._major: Optional[str] = major
@ -32,15 +26,15 @@ class VersionSettings(ConfigurationModelABC):
def to_str(self) -> str:
if self._micro is None:
return f'{self._major}.{self._minor}'
return f"{self._major}.{self._minor}"
else:
return f'{self._major}.{self._minor}.{self._micro}'
return f"{self._major}.{self._minor}.{self._micro}"
def from_dict(self, settings: dict):
self._major = settings[VersionSettingsNameEnum.major.value]
self._minor = settings[VersionSettingsNameEnum.minor.value]
micro = settings[VersionSettingsNameEnum.micro.value]
if micro != '':
if micro != "":
self._micro = micro
def to_dict(self) -> dict:

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.controller'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.controller"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -20,18 +20,18 @@ from bot_data.model.auth_role_enum import AuthRoleEnum
class AuthController:
BasePath = '/api/auth'
BasePath = "/api/auth"
def __init__(
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
auth_service: AuthServiceABC
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
auth_service: AuthServiceABC,
):
self._config = config
self._env = env
@ -42,55 +42,61 @@ class AuthController:
self._mailer = mailer
self._auth_service = auth_service
@Route.get(f'{BasePath}/users')
@Route.get(f"{BasePath}/users")
@Route.authorize(role=AuthRoleEnum.admin)
async def get_all_users(self) -> Response:
result = await self._auth_service.get_all_auth_users_async()
return jsonify(result.select(lambda x: x.to_dict()).to_list())
@Route.post(f'{BasePath}/users/get/filtered')
@Route.post(f"{BasePath}/users/get/filtered")
@Route.authorize(role=AuthRoleEnum.admin)
async def get_filtered_users(self) -> Response:
dto: AuthUserSelectCriteria = JSONProcessor.process(AuthUserSelectCriteria, request.get_json(force=True, silent=True))
dto: AuthUserSelectCriteria = JSONProcessor.process(
AuthUserSelectCriteria, request.get_json(force=True, silent=True)
)
result = await self._auth_service.get_filtered_auth_users_async(dto)
result.result = result.result.select(lambda x: x.to_dict()).to_list()
return jsonify(result.to_dict())
@Route.get(f'{BasePath}/users/get/<email>')
@Route.get(f"{BasePath}/users/get/<email>")
@Route.authorize
async def get_user_from_email(self, email: str) -> Response:
result = await self._auth_service.get_auth_user_by_email_async(email)
return jsonify(result.to_dict())
@Route.get(f'{BasePath}/users/find/<email>')
@Route.get(f"{BasePath}/users/find/<email>")
@Route.authorize
async def find_user_from_email(self, email: str) -> Response:
result = await self._auth_service.find_auth_user_by_email_async(email)
return jsonify(result.to_dict())
@Route.post(f'{BasePath}/register')
@Route.post(f"{BasePath}/register")
async def register(self):
dto: AuthUserDTO = JSONProcessor.process(AuthUserDTO, request.get_json(force=True, silent=True))
dto: AuthUserDTO = JSONProcessor.process(
AuthUserDTO, request.get_json(force=True, silent=True)
)
await self._auth_service.add_auth_user_async(dto)
return '', 200
return "", 200
@Route.post(f'{BasePath}/register-by-id/<id>')
@Route.post(f"{BasePath}/register-by-id/<id>")
async def register_id(self, id: str):
result = await self._auth_service.confirm_email_async(id)
return jsonify(result)
@Route.post(f'{BasePath}/login')
@Route.post(f"{BasePath}/login")
async def login(self) -> Response:
dto: AuthUserDTO = JSONProcessor.process(AuthUserDTO, request.get_json(force=True, silent=True))
dto: AuthUserDTO = JSONProcessor.process(
AuthUserDTO, request.get_json(force=True, silent=True)
)
result = await self._auth_service.login_async(dto)
return jsonify(result.to_dict())
@Route.get(f'{BasePath}/verify-login')
@Route.get(f"{BasePath}/verify-login")
async def verify_login(self):
token = None
result = False
if 'Authorization' in request.headers:
bearer = request.headers.get('Authorization')
if "Authorization" in request.headers:
bearer = request.headers.get("Authorization")
token = bearer.split()[1]
if token is not None:
@ -98,58 +104,70 @@ class AuthController:
return jsonify(result)
@Route.post(f'{BasePath}/forgot-password/<email>')
@Route.post(f"{BasePath}/forgot-password/<email>")
async def forgot_password(self, email: str):
await self._auth_service.forgot_password_async(email)
return '', 200
return "", 200
@Route.post(f'{BasePath}/confirm-forgot-password/<id>')
@Route.post(f"{BasePath}/confirm-forgot-password/<id>")
async def confirm_forgot_password(self, id: str):
result = await self._auth_service.confirm_forgot_password_async(id)
return jsonify(result.to_dict())
@Route.post(f'{BasePath}/reset-password')
@Route.post(f"{BasePath}/reset-password")
async def reset_password(self):
dto: ResetPasswordDTO = JSONProcessor.process(ResetPasswordDTO, request.get_json(force=True, silent=True))
dto: ResetPasswordDTO = JSONProcessor.process(
ResetPasswordDTO, request.get_json(force=True, silent=True)
)
await self._auth_service.reset_password_async(dto)
return '', 200
return "", 200
@Route.post(f'{BasePath}/update-user')
@Route.post(f"{BasePath}/update-user")
@Route.authorize
async def update_user(self):
dto: UpdateAuthUserDTO = JSONProcessor.process(UpdateAuthUserDTO, request.get_json(force=True, silent=True))
dto: UpdateAuthUserDTO = JSONProcessor.process(
UpdateAuthUserDTO, request.get_json(force=True, silent=True)
)
await self._auth_service.update_user_async(dto)
return '', 200
return "", 200
@Route.post(f'{BasePath}/update-user-as-admin')
@Route.post(f"{BasePath}/update-user-as-admin")
@Route.authorize(role=AuthRoleEnum.admin)
async def update_user_as_admin(self):
dto: UpdateAuthUserDTO = JSONProcessor.process(UpdateAuthUserDTO, request.get_json(force=True, silent=True))
dto: UpdateAuthUserDTO = JSONProcessor.process(
UpdateAuthUserDTO, request.get_json(force=True, silent=True)
)
await self._auth_service.update_user_as_admin_async(dto)
return '', 200
return "", 200
@Route.post(f'{BasePath}/refresh')
@Route.post(f"{BasePath}/refresh")
@Route.authorize
async def refresh(self) -> Response:
dto: TokenDTO = JSONProcessor.process(TokenDTO, request.get_json(force=True, silent=True))
dto: TokenDTO = JSONProcessor.process(
TokenDTO, request.get_json(force=True, silent=True)
)
result = await self._auth_service.refresh_async(dto)
return jsonify(result.to_dict())
@Route.post(f'{BasePath}/revoke')
@Route.post(f"{BasePath}/revoke")
async def revoke(self):
dto: TokenDTO = JSONProcessor.process(TokenDTO, request.get_json(force=True, silent=True))
dto: TokenDTO = JSONProcessor.process(
TokenDTO, request.get_json(force=True, silent=True)
)
await self._auth_service.revoke_async(dto)
return '', 200
return "", 200
@Route.post(f'{BasePath}/delete-user')
@Route.post(f"{BasePath}/delete-user")
@Route.authorize(role=AuthRoleEnum.admin)
async def delete_user(self):
dto: AuthUserDTO = JSONProcessor.process(AuthUserDTO, request.get_json(force=True, silent=True))
dto: AuthUserDTO = JSONProcessor.process(
AuthUserDTO, request.get_json(force=True, silent=True)
)
await self._auth_service.delete_auth_user_async(dto)
return '', 200
return "", 200
@Route.post(f'{BasePath}/delete-user-by-mail/<email>')
@Route.post(f"{BasePath}/delete-user-by-mail/<email>")
@Route.authorize(role=AuthRoleEnum.admin)
async def delete_user_by_mail(self, email: str):
await self._auth_service.delete_auth_user_by_email_async(email)
return '', 200
return "", 200

View File

@ -13,7 +13,9 @@ from requests_oauthlib import OAuth2Session
from bot_api.abc.auth_service_abc import AuthServiceABC
from bot_api.api import Api
from bot_api.configuration.discord_authentication_settings import DiscordAuthenticationSettings
from bot_api.configuration.discord_authentication_settings import (
DiscordAuthenticationSettings,
)
from bot_api.json_processor import JSONProcessor
from bot_api.logging.api_logger import ApiLogger
from bot_api.model.auth_user_dto import AuthUserDTO
@ -22,24 +24,24 @@ from bot_api.route.route import Route
from bot_data.model.auth_role_enum import AuthRoleEnum
# Disable SSL requirement
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
class AuthDiscordController:
BasePath = '/api/auth/discord'
BasePath = "/api/auth/discord"
def __init__(
self,
auth_settings: DiscordAuthenticationSettings,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
bot: DiscordBotServiceABC,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
auth_service: AuthServiceABC
self,
auth_settings: DiscordAuthenticationSettings,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
bot: DiscordBotServiceABC,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
auth_service: AuthServiceABC,
):
self._auth_settings = auth_settings
self._config = config
@ -53,46 +55,58 @@ class AuthDiscordController:
self._auth_service = auth_service
def _get_user_from_discord_response(self) -> dict:
discord = OAuth2Session(self._bot.user.id, redirect_uri=self._auth_settings.redirect_url, state=request.args.get('state'), scope=self._auth_settings.scope)
discord = OAuth2Session(
self._bot.user.id,
redirect_uri=self._auth_settings.redirect_url,
state=request.args.get("state"),
scope=self._auth_settings.scope,
)
token = discord.fetch_token(
self._auth_settings.token_url,
client_secret=CredentialManager.decrypt(self._auth_settings.client_secret),
authorization_response=request.url,
)
discord = OAuth2Session(self._bot.user.id, token=token)
return discord.get('https://discordapp.com/api' + '/users/@me').json()
return discord.get("https://discordapp.com/api" + "/users/@me").json()
@Route.get(f'{BasePath}/get-url')
@Route.get(f"{BasePath}/get-url")
async def get_url(self):
oauth = OAuth2Session(self._bot.user.id, redirect_uri=self._auth_settings.redirect_url, scope=self._auth_settings.scope)
oauth = OAuth2Session(
self._bot.user.id,
redirect_uri=self._auth_settings.redirect_url,
scope=self._auth_settings.scope,
)
login_url, state = oauth.authorization_url(self._auth_settings.auth_url)
return jsonify({'loginUrl': login_url})
return jsonify({"loginUrl": login_url})
@Route.get(f'{BasePath}/create-user')
@Route.get(f"{BasePath}/create-user")
async def discord_create_user(self) -> Response:
response = self._get_user_from_discord_response()
result = await self._auth_service.add_auth_user_by_discord_async(AuthUserDTO(
0,
response['username'],
response['discriminator'],
response['email'],
str(uuid.uuid4()),
None,
AuthRoleEnum.normal
), response['id'])
result = await self._auth_service.add_auth_user_by_discord_async(
AuthUserDTO(
0,
response["username"],
response["discriminator"],
response["email"],
str(uuid.uuid4()),
None,
AuthRoleEnum.normal,
),
response["id"],
)
return jsonify(result.to_dict())
@Route.get(f'{BasePath}/login')
@Route.get(f"{BasePath}/login")
async def discord_login(self) -> Response:
response = self._get_user_from_discord_response()
dto = AuthUserDTO(
0,
response['username'],
response['discriminator'],
response['email'],
response["username"],
response["discriminator"],
response["email"],
str(uuid.uuid4()),
None,
AuthRoleEnum.normal
AuthRoleEnum.normal,
)
result = await self._auth_service.login_discord_async(dto)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.controller.discord'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.controller.discord"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -14,18 +14,18 @@ from bot_data.model.auth_role_enum import AuthRoleEnum
class ServerController:
BasePath = f'/api/discord/server'
BasePath = f"/api/discord/server"
def __init__(
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
discord_service: DiscordService
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
discord_service: DiscordService,
):
self._config = config
self._env = env
@ -36,29 +36,31 @@ class ServerController:
self._mailer = mailer
self._discord_service = discord_service
@Route.get(f'{BasePath}/get/servers')
@Route.get(f"{BasePath}/get/servers")
@Route.authorize(role=AuthRoleEnum.admin)
async def get_all_servers(self) -> Response:
result = await self._discord_service.get_all_servers()
result = result.select(lambda x: x.to_dict()).to_list()
return jsonify(result)
@Route.get(f'{BasePath}/get/servers-by-user')
@Route.get(f"{BasePath}/get/servers-by-user")
@Route.authorize
async def get_all_servers_by_user(self) -> Response:
result = await self._discord_service.get_all_servers_by_user()
result = result.select(lambda x: x.to_dict()).to_list()
return jsonify(result)
@Route.post(f'{BasePath}/get/filtered')
@Route.post(f"{BasePath}/get/filtered")
@Route.authorize
async def get_filtered_servers(self) -> Response:
dto: ServerSelectCriteria = JSONProcessor.process(ServerSelectCriteria, request.get_json(force=True, silent=True))
dto: ServerSelectCriteria = JSONProcessor.process(
ServerSelectCriteria, request.get_json(force=True, silent=True)
)
result = await self._discord_service.get_filtered_servers_async(dto)
result.result = result.result.select(lambda x: x.to_dict()).to_list()
return jsonify(result.to_dict())
@Route.get(f'{BasePath}/get/<id>')
@Route.get(f"{BasePath}/get/<id>")
@Route.authorize
async def get_server_by_id(self, id: int) -> Response:
result = await self._discord_service.get_server_by_id_async(id).to_list()

View File

@ -15,18 +15,18 @@ from bot_api.route.route import Route
class GuiController:
BasePath = f'/api/gui'
BasePath = f"/api/gui"
def __init__(
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
auth_settings: AuthenticationSettings
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
auth_settings: AuthenticationSettings,
):
self._config = config
self._env = env
@ -37,42 +37,48 @@ class GuiController:
self._mailer = mailer
self._auth_settings = auth_settings
@Route.get(f'{BasePath}/api-version')
@Route.get(f"{BasePath}/api-version")
async def api_version(self):
import bot_api
version = bot_api.version_info
return VersionDTO(version.major, version.minor, version.micro).to_dict()
@Route.get(f'{BasePath}/settings')
@Route.get(f"{BasePath}/settings")
@Route.authorize
async def settings(self):
import bot_api
version = bot_api.version_info
return jsonify(SettingsDTO(
'',
VersionDTO(version.major, version.minor, version.micro),
os.path.abspath(os.path.join(self._env.working_directory, 'config')),
'/',
'/',
self._auth_settings.token_expire_time,
self._auth_settings.refresh_token_expire_time,
self._mail_settings.user_name,
self._mail_settings.port,
self._mail_settings.host,
self._mail_settings.user_name,
self._mail_settings.user_name,
).to_dict())
return jsonify(
SettingsDTO(
"",
VersionDTO(version.major, version.minor, version.micro),
os.path.abspath(os.path.join(self._env.working_directory, "config")),
"/",
"/",
self._auth_settings.token_expire_time,
self._auth_settings.refresh_token_expire_time,
self._mail_settings.user_name,
self._mail_settings.port,
self._mail_settings.host,
self._mail_settings.user_name,
self._mail_settings.user_name,
).to_dict()
)
@Route.post(f'{BasePath}/send-test-mail/<email>')
@Route.post(f"{BasePath}/send-test-mail/<email>")
@Route.authorize
async def send_test_mail(self, email: str):
mail = EMail()
mail.add_header('Mime-Version: 1.0')
mail.add_header('Content-Type: text/plain; charset=utf-8')
mail.add_header('Content-Transfer-Encoding: quoted-printable')
mail.add_header("Mime-Version: 1.0")
mail.add_header("Content-Type: text/plain; charset=utf-8")
mail.add_header("Content-Transfer-Encoding: quoted-printable")
mail.add_receiver(email)
mail.subject = self._t.transform('api.api.test_mail.subject')
mail.body = self._t.transform('api.api.test_mail.message').format(self._env.host_name, self._env.environment_name)
mail.subject = self._t.transform("api.api.test_mail.subject")
mail.body = self._t.transform("api.api.test_mail.message").format(
self._env.host_name, self._env.environment_name
)
self._mailer.send_mail(mail)
return '', 200
return "", 200

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.event'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.event"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -4,7 +4,6 @@ from bot_api.api_thread import ApiThread
class BotApiOnReadyEvent(OnReadyABC):
def __init__(self, api: ApiThread):
OnReadyABC.__init__(self)
self._api = api

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.exception'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.exception"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -2,7 +2,6 @@ from bot_api.exception.service_error_code_enum import ServiceErrorCode
class ServiceException(Exception):
def __init__(self, error_code: ServiceErrorCode, message: str, *args):
Exception.__init__(self, *args)
@ -10,4 +9,4 @@ class ServiceException(Exception):
self.message = message
def get_detailed_message(self) -> str:
return f'ServiceException - ErrorCode: {self.error_code} - ErrorMessage: {self.message}'
return f"ServiceException - ErrorCode: {self.error_code} - ErrorMessage: {self.message}"

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.filter'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.filter"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -2,20 +2,20 @@ from bot_api.abc.select_criteria_abc import SelectCriteriaABC
class AuthUserSelectCriteria(SelectCriteriaABC):
def __init__(
self,
page_index: int,
page_size: int,
sort_direction: str,
sort_column: str,
first_name: str,
last_name: str,
email: str,
auth_role: int
self,
page_index: int,
page_size: int,
sort_direction: str,
sort_column: str,
first_name: str,
last_name: str,
email: str,
auth_role: int,
):
SelectCriteriaABC.__init__(self, page_index, page_size, sort_direction, sort_column)
SelectCriteriaABC.__init__(
self, page_index, page_size, sort_direction, sort_column
)
self.first_name = first_name
self.last_name = last_name

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.filter.discord'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.filter.discord"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -2,16 +2,16 @@ from bot_api.abc.select_criteria_abc import SelectCriteriaABC
class ServerSelectCriteria(SelectCriteriaABC):
def __init__(
self,
page_index: int,
page_size: int,
sort_direction: str,
sort_column: str,
name: str,
self,
page_index: int,
page_size: int,
sort_direction: str,
sort_column: str,
name: str,
):
SelectCriteriaABC.__init__(self, page_index, page_size, sort_direction, sort_column)
SelectCriteriaABC.__init__(
self, page_index, page_size, sort_direction, sort_column
)
self.name = name

View File

@ -5,7 +5,6 @@ from cpl_core.utils import String
class JSONProcessor:
@staticmethod
def process(_t: type, values: dict) -> object:
args = []
@ -13,14 +12,14 @@ class JSONProcessor:
sig = signature(_t.__init__)
for param in sig.parameters.items():
parameter = param[1]
if parameter.name == 'self' or parameter.annotation == Parameter.empty:
if parameter.name == "self" or parameter.annotation == Parameter.empty:
continue
name = String.convert_to_camel_case(parameter.name)
name = name.replace('Dto', 'DTO')
name = name.replace("Dto", "DTO")
name_first_lower = String.first_to_lower(name)
if name in values or name_first_lower in values:
value = ''
value = ""
if name in values:
value = values[name]
else:

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.logging'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.logging"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -6,6 +6,10 @@ from bot_core.abc.custom_file_logger_abc import CustomFileLoggerABC
class ApiLogger(CustomFileLoggerABC):
def __init__(self, config: ConfigurationABC, time_format: TimeFormatSettings, env: ApplicationEnvironmentABC):
CustomFileLoggerABC.__init__(self, 'Api', config, time_format, env)
def __init__(
self,
config: ConfigurationABC,
time_format: TimeFormatSettings,
env: ApplicationEnvironmentABC,
):
CustomFileLoggerABC.__init__(self, "Api", config, time_format, env)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.model'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.model"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -5,16 +5,15 @@ from bot_data.model.auth_role_enum import AuthRoleEnum
class AuthUserDTO(DtoABC):
def __init__(
self,
id: int = None,
first_name: str = None,
last_name: str = None,
email: str = None,
password: str = None,
confirmation_id: Optional[str] = None,
auth_role: AuthRoleEnum = None,
self,
id: int = None,
first_name: str = None,
last_name: str = None,
email: str = None,
password: str = None,
confirmation_id: Optional[str] = None,
auth_role: AuthRoleEnum = None,
):
DtoABC.__init__(self)
@ -79,21 +78,21 @@ class AuthUserDTO(DtoABC):
self._auth_role = value
def from_dict(self, values: dict):
self._id = values['id']
self._first_name = values['firstName']
self._last_name = values['lastName']
self._email = values['email']
self._password = values['password']
self._is_confirmed = values['isConfirmed']
self._auth_role = AuthRoleEnum(values['authRole'])
self._id = values["id"]
self._first_name = values["firstName"]
self._last_name = values["lastName"]
self._email = values["email"]
self._password = values["password"]
self._is_confirmed = values["isConfirmed"]
self._auth_role = AuthRoleEnum(values["authRole"])
def to_dict(self) -> dict:
return {
'id': self._id,
'firstName': self._first_name,
'lastName': self._last_name,
'email': self._email,
'password': self._password,
'isConfirmed': self._is_confirmed,
'authRole': self._auth_role.value,
"id": self._id,
"firstName": self._first_name,
"lastName": self._last_name,
"email": self._email,
"password": self._password,
"isConfirmed": self._is_confirmed,
"authRole": self._auth_role.value,
}

View File

@ -5,17 +5,13 @@ from bot_data.filtered_result import FilteredResult
class AuthUserFilteredResultDTO(DtoABC, FilteredResult):
def __init__(self, result: List = None, total_count: int = 0):
DtoABC.__init__(self)
FilteredResult.__init__(self, result, total_count)
def from_dict(self, values: dict):
self._result = values['users']
self._total_count = values['totalCount']
self._result = values["users"]
self._total_count = values["totalCount"]
def to_dict(self) -> dict:
return {
'users': self.result,
'totalCount': self.total_count
}
return {"users": self.result, "totalCount": self.total_count}

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.model.discord'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.model.discord"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -4,15 +4,13 @@ from bot_api.abc.dto_abc import DtoABC
class ServerDTO(DtoABC):
def __init__(
self,
server_id: int,
discord_id: int,
name: str,
member_count: int,
icon_url: Optional[str]
self,
server_id: int,
discord_id: int,
name: str,
member_count: int,
icon_url: Optional[str],
):
DtoABC.__init__(self)
@ -21,19 +19,19 @@ class ServerDTO(DtoABC):
self._name = name
self._member_count = member_count
self._icon_url = icon_url
@property
def server_id(self) -> int:
return self._server_id
@property
def discord_id(self) -> int:
return self._discord_id
@property
def name(self) -> str:
return self._name
@property
def member_count(self) -> int:
return self._member_count
@ -43,16 +41,16 @@ class ServerDTO(DtoABC):
return self._icon_url
def from_dict(self, values: dict):
self._server_id = int(values['serverId'])
self._discord_id = int(values['discordId'])
self._name = values['name']
self._icon_url = values['iconURL']
self._server_id = int(values["serverId"])
self._discord_id = int(values["discordId"])
self._name = values["name"]
self._icon_url = values["iconURL"]
def to_dict(self) -> dict:
return {
'serverId': self._server_id,
'discordId': self._discord_id,
'name': self._name,
'memberCount': self._member_count,
'iconURL': self._icon_url,
"serverId": self._server_id,
"discordId": self._discord_id,
"name": self._name,
"memberCount": self._member_count,
"iconURL": self._icon_url,
}

View File

@ -5,17 +5,13 @@ from bot_data.filtered_result import FilteredResult
class ServerFilteredResultDTO(DtoABC, FilteredResult):
def __init__(self, result: List = None, total_count: int = 0):
DtoABC.__init__(self)
FilteredResult.__init__(self, result, total_count)
def from_dict(self, values: dict):
self._result = values['servers']
self._total_count = values['totalCount']
self._result = values["servers"]
self._total_count = values["totalCount"]
def to_dict(self) -> dict:
return {
'servers': self.result,
'totalCount': self.total_count
}
return {"servers": self.result, "totalCount": self.total_count}

View File

@ -6,16 +6,13 @@ from bot_api.abc.dto_abc import DtoABC
class EMailStringDTO(DtoABC):
def __init__(self, email: str):
DtoABC.__init__(self)
self._email = email
def from_dict(self, values: dict):
self._email = values['email']
self._email = values["email"]
def to_dict(self) -> dict:
return {
'email': self._email
}
return {"email": self._email}

View File

@ -8,11 +8,12 @@ from bot_api.exception.service_error_code_enum import ServiceErrorCode
class ErrorDTO(DtoABC):
def __init__(self, error_code: Optional[ServiceErrorCode], message: str):
DtoABC.__init__(self)
self._error_code = ServiceErrorCode.Unknown if error_code is None else error_code
self._error_code = (
ServiceErrorCode.Unknown if error_code is None else error_code
)
self._message = message
@property
@ -24,11 +25,8 @@ class ErrorDTO(DtoABC):
return self._message
def from_dict(self, values: dict):
self._error_code = values['ErrorCode']
self._message = values['Message']
self._error_code = values["ErrorCode"]
self._message = values["Message"]
def to_dict(self) -> dict:
return {
'errorCode': int(self._error_code.value),
'message': self._message
}
return {"errorCode": int(self._error_code.value), "message": self._message}

View File

@ -6,11 +6,10 @@ from bot_data.model.auth_role_enum import AuthRoleEnum
class OAuthDTO(DtoABC):
def __init__(
self,
user: AuthUserDTO,
o_auth_id: Optional[str],
self,
user: AuthUserDTO,
o_auth_id: Optional[str],
):
DtoABC.__init__(self)
@ -34,11 +33,8 @@ class OAuthDTO(DtoABC):
self._oauth_id = value
def from_dict(self, values: dict):
self._user = AuthUserDTO().from_dict(values['user'])
self._oauth_id = values['oAuthId']
self._user = AuthUserDTO().from_dict(values["user"])
self._oauth_id = values["oAuthId"]
def to_dict(self) -> dict:
return {
'user': self._user.to_dict(),
'oAuthId': self._oauth_id
}
return {"user": self._user.to_dict(), "oAuthId": self._oauth_id}

View File

@ -6,7 +6,6 @@ from bot_api.abc.dto_abc import DtoABC
class ResetPasswordDTO(DtoABC):
def __init__(self, id: str, password: str):
DtoABC.__init__(self)
@ -22,11 +21,8 @@ class ResetPasswordDTO(DtoABC):
return self._password
def from_dict(self, values: dict):
self._id = values['id']
self._password = values['password']
self._id = values["id"]
self._password = values["password"]
def to_dict(self) -> dict:
return {
'id': self._id,
'password': self._password
}
return {"id": self._id, "password": self._password}

View File

@ -3,21 +3,20 @@ from bot_api.model.version_dto import VersionDTO
class SettingsDTO(DtoABC):
def __init__(
self,
web_version: str,
api_version: VersionDTO,
config_path: str,
web_base_url: str,
api_base_url: str,
token_expire_time: int,
refresh_token_expire_time: int,
mail_user: str,
mail_port: int,
mail_host: str,
mail_transceiver: str,
mail_transceiver_address: str,
self,
web_version: str,
api_version: VersionDTO,
config_path: str,
web_base_url: str,
api_base_url: str,
token_expire_time: int,
refresh_token_expire_time: int,
mail_user: str,
mail_port: int,
mail_host: str,
mail_transceiver: str,
mail_transceiver_address: str,
):
DtoABC.__init__(self)
@ -37,31 +36,31 @@ class SettingsDTO(DtoABC):
self._mail_transceiver_address = mail_transceiver_address
def from_dict(self, values: dict):
self._web_version = values['webVersion']
self._api_version.from_dict(values['apiVersion'])
self._config_path = values['configPath']
self._web_base_url = values['webBaseURL']
self._api_base_url = values['apiBaseURL']
self._token_expire_time = values['tokenExpireTime']
self._refresh_token_expire_time = values['refreshTokenExpireTime']
self._mail_user = values['mailUser']
self._mail_port = values['mailPort']
self._mail_host = values['mailHost']
self._mail_transceiver = values['mailTransceiver']
self._mail_transceiver_address = values['mailTransceiverAddress']
self._web_version = values["webVersion"]
self._api_version.from_dict(values["apiVersion"])
self._config_path = values["configPath"]
self._web_base_url = values["webBaseURL"]
self._api_base_url = values["apiBaseURL"]
self._token_expire_time = values["tokenExpireTime"]
self._refresh_token_expire_time = values["refreshTokenExpireTime"]
self._mail_user = values["mailUser"]
self._mail_port = values["mailPort"]
self._mail_host = values["mailHost"]
self._mail_transceiver = values["mailTransceiver"]
self._mail_transceiver_address = values["mailTransceiverAddress"]
def to_dict(self) -> dict:
return {
'webVersion': self._web_version,
'apiVersion': self._api_version.str,
'configPath': self._config_path,
'webBaseURL': self._web_base_url,
'apiBaseURL': self._api_base_url,
'tokenExpireTime': self._token_expire_time,
'refreshTokenExpireTime': self._refresh_token_expire_time,
'mailUser': self._mail_user,
'mailPort': self._mail_port,
'mailHost': self._mail_host,
'mailTransceiver': self._mail_transceiver,
'mailTransceiverAddress': self._mail_transceiver_address,
"webVersion": self._web_version,
"apiVersion": self._api_version.str,
"configPath": self._config_path,
"webBaseURL": self._web_base_url,
"apiBaseURL": self._api_base_url,
"tokenExpireTime": self._token_expire_time,
"refreshTokenExpireTime": self._refresh_token_expire_time,
"mailUser": self._mail_user,
"mailPort": self._mail_port,
"mailHost": self._mail_host,
"mailTransceiver": self._mail_transceiver,
"mailTransceiverAddress": self._mail_transceiver_address,
}

View File

@ -6,27 +6,23 @@ from bot_api.abc.dto_abc import DtoABC
class TokenDTO(DtoABC):
def __init__(self, token: str, refresh_token: str):
DtoABC.__init__(self)
self._token = token
self._refresh_token = refresh_token
@property
def token(self) -> str:
return self._token
@property
def refresh_token(self) -> str:
return self._refresh_token
def from_dict(self, values: dict):
self._token = values['token']
self._refresh_token = values['refreshToken']
self._token = values["token"]
self._refresh_token = values["refreshToken"]
def to_dict(self) -> dict:
return {
'token': self._token,
'refreshToken': self._refresh_token
}
return {"token": self._token, "refreshToken": self._refresh_token}

View File

@ -7,12 +7,11 @@ from bot_api.model.auth_user_dto import AuthUserDTO
class UpdateAuthUserDTO(DtoABC):
def __init__(
self,
auth_user_dto: AuthUserDTO,
new_auth_user_dto: AuthUserDTO,
change_password: bool = False
self,
auth_user_dto: AuthUserDTO,
new_auth_user_dto: AuthUserDTO,
change_password: bool = False,
):
DtoABC.__init__(self)
@ -33,13 +32,15 @@ class UpdateAuthUserDTO(DtoABC):
return self._change_password
def from_dict(self, values: dict):
self._auth_user = AuthUserDTO().from_dict(values['authUser'])
self._new_auth_user = AuthUserDTO().from_dict(values['newAuthUser'])
self._change_password = False if 'changePassword' not in values else bool(values['changePassword'])
self._auth_user = AuthUserDTO().from_dict(values["authUser"])
self._new_auth_user = AuthUserDTO().from_dict(values["newAuthUser"])
self._change_password = (
False if "changePassword" not in values else bool(values["changePassword"])
)
def to_dict(self) -> dict:
return {
'authUser': self._auth_user,
'newAuthUser': self._new_auth_user,
'changePassword': self._change_password
"authUser": self._auth_user,
"newAuthUser": self._new_auth_user,
"changePassword": self._change_password,
}

View File

@ -6,7 +6,6 @@ from bot_api.abc.dto_abc import DtoABC
class VersionDTO(DtoABC):
def __init__(self, major: str = None, minor: str = None, micro: str = None):
DtoABC.__init__(self)
@ -28,16 +27,16 @@ class VersionDTO(DtoABC):
@property
def str(self) -> str:
return f'{self._major}.{self._minor}.{self._micro}'
return f"{self._major}.{self._minor}.{self._micro}"
def from_dict(self, values: dict):
self._major = values['major']
self._minor = values['minor']
self._micro = values['micro']
self._major = values["major"]
self._minor = values["minor"]
self._micro = values["micro"]
def to_dict(self) -> dict:
return {
'major': self._major,
'minor': self._minor,
'micro': self._micro,
"major": self._major,
"minor": self._minor,
"micro": self._micro,
}

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.route'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.route"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -32,39 +32,43 @@ class Route:
@wraps(f)
async def decorator(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
bearer = request.headers.get('Authorization')
if "Authorization" in request.headers:
bearer = request.headers.get("Authorization")
token = bearer.split()[1]
if token is None:
ex = ServiceException(ServiceErrorCode.Unauthorized, f'Token not set')
ex = ServiceException(ServiceErrorCode.Unauthorized, f"Token not set")
error = ErrorDTO(ex.error_code, ex.message)
return jsonify(error.to_dict()), 401
if cls._auth_users is None or cls._auth is None:
ex = ServiceException(ServiceErrorCode.Unauthorized, f'Authorize is not initialized')
ex = ServiceException(
ServiceErrorCode.Unauthorized, f"Authorize is not initialized"
)
error = ErrorDTO(ex.error_code, ex.message)
return jsonify(error.to_dict()), 401
if not cls._auth.verify_login(token):
ex = ServiceException(ServiceErrorCode.Unauthorized, f'Token expired')
ex = ServiceException(ServiceErrorCode.Unauthorized, f"Token expired")
error = ErrorDTO(ex.error_code, ex.message)
return jsonify(error.to_dict()), 401
token = cls._auth.decode_token(token)
if token is None or 'email' not in token:
ex = ServiceException(ServiceErrorCode.Unauthorized, f'Token invalid')
if token is None or "email" not in token:
ex = ServiceException(ServiceErrorCode.Unauthorized, f"Token invalid")
error = ErrorDTO(ex.error_code, ex.message)
return jsonify(error.to_dict()), 401
user = cls._auth_users.get_auth_user_by_email(token['email'])
user = cls._auth_users.get_auth_user_by_email(token["email"])
if user is None:
ex = ServiceException(ServiceErrorCode.Unauthorized, f'Token invalid')
ex = ServiceException(ServiceErrorCode.Unauthorized, f"Token invalid")
error = ErrorDTO(ex.error_code, ex.message)
return jsonify(error.to_dict()), 401
if role is not None and user.auth_role.value < role.value:
ex = ServiceException(ServiceErrorCode.Unauthorized, f'Role {role} required')
ex = ServiceException(
ServiceErrorCode.Unauthorized, f"Role {role} required"
)
error = ErrorDTO(ex.error_code, ex.message)
return jsonify(error.to_dict()), 403
@ -84,20 +88,20 @@ class Route:
@classmethod
def get(cls, path=None, **kwargs):
return cls.route(path, methods=['GET'], **kwargs)
return cls.route(path, methods=["GET"], **kwargs)
@classmethod
def post(cls, path=None, **kwargs):
return cls.route(path, methods=['POST'], **kwargs)
return cls.route(path, methods=["POST"], **kwargs)
@classmethod
def head(cls, path=None, **kwargs):
return cls.route(path, methods=['HEAD'], **kwargs)
return cls.route(path, methods=["HEAD"], **kwargs)
@classmethod
def put(cls, path=None, **kwargs):
return cls.route(path, methods=['PUT'], **kwargs)
return cls.route(path, methods=["PUT"], **kwargs)
@classmethod
def delete(cls, path=None, **kwargs):
return cls.route(path, methods=['DELETE'], **kwargs)
return cls.route(path, methods=["DELETE"], **kwargs)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.service'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.service"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -38,26 +38,24 @@ from bot_data.model.auth_role_enum import AuthRoleEnum
from bot_data.model.auth_user import AuthUser
from bot_data.model.auth_user_users_relation import AuthUserUsersRelation
_email_regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
_email_regex = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
class AuthService(AuthServiceABC):
def __init__(
self,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
bot: DiscordBotServiceABC,
db: DatabaseContextABC,
auth_users: AuthUserRepositoryABC,
users: UserRepositoryABC,
servers: ServerRepositoryABC,
# mailer: MailThread,
mailer: EMailClientABC,
t: TranslatePipe,
auth_settings: AuthenticationSettings,
frontend_settings: FrontendSettings,
self,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
bot: DiscordBotServiceABC,
db: DatabaseContextABC,
auth_users: AuthUserRepositoryABC,
users: UserRepositoryABC,
servers: ServerRepositoryABC,
# mailer: MailThread,
mailer: EMailClientABC,
t: TranslatePipe,
auth_settings: AuthenticationSettings,
frontend_settings: FrontendSettings,
):
AuthServiceABC.__init__(self)
@ -75,7 +73,7 @@ class AuthService(AuthServiceABC):
@staticmethod
def _hash_sha256(password: str, salt: str) -> str:
return hashlib.sha256(f'{password}{salt}'.encode('utf-8')).hexdigest()
return hashlib.sha256(f"{password}{salt}".encode("utf-8")).hexdigest()
@staticmethod
def _is_email_valid(email: str) -> bool:
@ -87,14 +85,15 @@ class AuthService(AuthServiceABC):
def generate_token(self, user: AuthUser) -> str:
token = jwt.encode(
payload={
'user_id': user.id,
'email': user.email,
'role': user.auth_role.value,
'exp': datetime.now(tz=timezone.utc) + timedelta(days=self._auth_settings.token_expire_time),
'iss': self._auth_settings.issuer,
'aud': self._auth_settings.audience
"user_id": user.id,
"email": user.email,
"role": user.auth_role.value,
"exp": datetime.now(tz=timezone.utc)
+ timedelta(days=self._auth_settings.token_expire_time),
"iss": self._auth_settings.issuer,
"aud": self._auth_settings.audience,
},
key=CredentialManager.decrypt(self._auth_settings.secret_key)
key=CredentialManager.decrypt(self._auth_settings.secret_key),
)
return token
@ -105,111 +104,129 @@ class AuthService(AuthServiceABC):
key=CredentialManager.decrypt(self._auth_settings.secret_key),
issuer=self._auth_settings.issuer,
audience=self._auth_settings.audience,
algorithms=['HS256']
algorithms=["HS256"],
)
def get_decoded_token_from_request(self) -> dict:
token = None
if 'Authorization' in request.headers:
bearer = request.headers.get('Authorization')
if "Authorization" in request.headers:
bearer = request.headers.get("Authorization")
token = bearer.split()[1]
if token is None:
raise ServiceException(ServiceErrorCode.Unauthorized, f'Token not set')
raise ServiceException(ServiceErrorCode.Unauthorized, f"Token not set")
return jwt.decode(
token,
key=CredentialManager.decrypt(self._auth_settings.secret_key),
issuer=self._auth_settings.issuer,
audience=self._auth_settings.audience,
algorithms=['HS256']
algorithms=["HS256"],
)
def find_decoded_token_from_request(self) -> Optional[dict]:
token = None
if 'Authorization' in request.headers:
bearer = request.headers.get('Authorization')
if "Authorization" in request.headers:
bearer = request.headers.get("Authorization")
token = bearer.split()[1]
return jwt.decode(
token,
key=CredentialManager.decrypt(self._auth_settings.secret_key),
issuer=self._auth_settings.issuer,
audience=self._auth_settings.audience,
algorithms=['HS256']
) if token is not None else None
return (
jwt.decode(
token,
key=CredentialManager.decrypt(self._auth_settings.secret_key),
issuer=self._auth_settings.issuer,
audience=self._auth_settings.audience,
algorithms=["HS256"],
)
if token is not None
else None
)
def _create_and_save_refresh_token(self, user: AuthUser) -> str:
token = str(uuid.uuid4())
user.refresh_token = token
user.refresh_token_expire_time = datetime.now() + timedelta(days=self._auth_settings.refresh_token_expire_time)
user.refresh_token_expire_time = datetime.now() + timedelta(
days=self._auth_settings.refresh_token_expire_time
)
self._auth_users.update_auth_user(user)
self._db.save_changes()
return token
def _send_link_mail(self, email: str, subject: str, message: str):
url = self._frontend_settings.url
if not url.endswith('/'):
url = f'{url}/'
if not url.endswith("/"):
url = f"{url}/"
self._mailer.connect()
mail = EMail()
mail.add_header('Mime-Version: 1.0')
mail.add_header('Content-Type: text/plain; charset=utf-8')
mail.add_header('Content-Transfer-Encoding: quoted-printable')
mail.add_header("Mime-Version: 1.0")
mail.add_header("Content-Type: text/plain; charset=utf-8")
mail.add_header("Content-Transfer-Encoding: quoted-printable")
mail.add_receiver(str(email))
mail.subject = subject
mail.body = textwrap.dedent(f"""{message}
mail.body = textwrap.dedent(
f"""{message}
{self._t.transform('api.mail.automatic_mail').format(self._environment.application_name, self._environment.environment_name, self._environment.host_name)}
""")
"""
)
thr = Thread(target=self._mailer.send_mail, args=[mail])
thr.start()
def _send_confirmation_id_to_user(self, user: AuthUser):
url = self._frontend_settings.url
if not url.endswith('/'):
url = f'{url}/'
if not url.endswith("/"):
url = f"{url}/"
self._send_link_mail(
user.email,
self._t.transform('api.auth.confirmation.subject').format(user.first_name, user.last_name),
self._t.transform('api.auth.confirmation.message').format(url, user.confirmation_id)
self._t.transform("api.auth.confirmation.subject").format(
user.first_name, user.last_name
),
self._t.transform("api.auth.confirmation.message").format(
url, user.confirmation_id
),
)
def _send_forgot_password_id_to_user(self, user: AuthUser):
url = self._frontend_settings.url
if not url.endswith('/'):
url = f'{url}/'
if not url.endswith("/"):
url = f"{url}/"
self._send_link_mail(
user.email,
self._t.transform('api.auth.forgot_password.subject').format(user.first_name, user.last_name),
self._t.transform('api.auth.forgot_password.message').format(url, user.forgot_password_id)
self._t.transform("api.auth.forgot_password.subject").format(
user.first_name, user.last_name
),
self._t.transform("api.auth.forgot_password.message").format(
url, user.forgot_password_id
),
)
async def get_all_auth_users_async(self) -> List[AuthUserDTO]:
result = self._auth_users.get_all_auth_users() \
.select(lambda x: AUT.to_dto(x))
result = self._auth_users.get_all_auth_users().select(lambda x: AUT.to_dto(x))
return List(AuthUserDTO, result)
async def get_filtered_auth_users_async(self, criteria: AuthUserSelectCriteria) -> AuthUserFilteredResultDTO:
async def get_filtered_auth_users_async(
self, criteria: AuthUserSelectCriteria
) -> AuthUserFilteredResultDTO:
users = self._auth_users.get_filtered_auth_users(criteria)
result = users.result.select(lambda x: AUT.to_dto(x))
return AuthUserFilteredResultDTO(
List(AuthUserDTO, result),
users.total_count
)
return AuthUserFilteredResultDTO(List(AuthUserDTO, result), users.total_count)
async def get_auth_user_by_email_async(self, email: str, with_password: bool = False) -> AuthUserDTO:
async def get_auth_user_by_email_async(
self, email: str, with_password: bool = False
) -> AuthUserDTO:
try:
# todo: check if logged in user is admin then send mail
user = self._auth_users.get_auth_user_by_email(email)
return AUT.to_dto(user, password=user.password if with_password else None)
except Exception as e:
self._logger.error(__name__, f'AuthUser not found', e)
raise ServiceException(ServiceErrorCode.InvalidData, f'User not found {email}')
self._logger.error(__name__, f"AuthUser not found", e)
raise ServiceException(
ServiceErrorCode.InvalidData, f"User not found {email}"
)
async def find_auth_user_by_email_async(self, email: str) -> Optional[AuthUser]:
user = self._auth_users.find_auth_user_by_email(email)
@ -218,7 +235,7 @@ class AuthService(AuthServiceABC):
async def add_auth_user_async(self, user_dto: AuthUserDTO):
db_user = self._auth_users.find_auth_user_by_email(user_dto.email)
if db_user is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User already exists')
raise ServiceException(ServiceErrorCode.InvalidUser, "User already exists")
user = AUT.to_db(user_dto)
if self._auth_users.get_all_auth_users().count() == 0:
@ -227,66 +244,86 @@ class AuthService(AuthServiceABC):
user.password_salt = uuid.uuid4()
user.password = self._hash_sha256(user_dto.password, user.password_salt)
if not self._is_email_valid(user.email):
raise ServiceException(ServiceErrorCode.InvalidData, 'Invalid E-Mail address')
raise ServiceException(
ServiceErrorCode.InvalidData, "Invalid E-Mail address"
)
try:
user.confirmation_id = uuid.uuid4()
self._auth_users.add_auth_user(user)
self._send_confirmation_id_to_user(user)
self._db.save_changes()
self._logger.info(__name__, f'Added auth user with E-Mail: {user_dto.email}')
self._logger.info(
__name__, f"Added auth user with E-Mail: {user_dto.email}"
)
except Exception as e:
self._logger.error(__name__, f'Cannot add user with E-Mail {user_dto.email}', e)
self._logger.error(
__name__, f"Cannot add user with E-Mail {user_dto.email}", e
)
raise ServiceException(ServiceErrorCode.UnableToAdd, "Invalid E-Mail")
async def add_auth_user_by_oauth_async(self, dto: OAuthDTO):
db_user = self._auth_users.find_auth_user_by_email(dto.user.email)
if db_user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User not found')
raise ServiceException(ServiceErrorCode.InvalidUser, "User not found")
if db_user.oauth_id != dto.oauth_id:
raise ServiceException(ServiceErrorCode.InvalidUser, 'Wrong OAuthId')
raise ServiceException(ServiceErrorCode.InvalidUser, "Wrong OAuthId")
try:
db_user.first_name = dto.user.first_name
db_user.last_name = dto.user.last_name
db_user.password_salt = uuid.uuid4()
db_user.password = self._hash_sha256(dto.user.password, db_user.password_salt)
db_user.password = self._hash_sha256(
dto.user.password, db_user.password_salt
)
db_user.oauth_id = None
db_user.confirmation_id = uuid.uuid4()
self._send_confirmation_id_to_user(db_user)
self._auth_users.update_auth_user(db_user)
self._logger.info(__name__, f'Added auth user with E-Mail: {dto.user.email}')
self._logger.info(
__name__, f"Added auth user with E-Mail: {dto.user.email}"
)
except Exception as e:
self._logger.error(__name__, f'Cannot add user with E-Mail {dto.user.email}', e)
self._logger.error(
__name__, f"Cannot add user with E-Mail {dto.user.email}", e
)
raise ServiceException(ServiceErrorCode.UnableToAdd, "Invalid E-Mail")
self._db.save_changes()
async def add_auth_user_by_discord_async(self, user_dto: AuthUserDTO, dc_id: int) -> OAuthDTO:
async def add_auth_user_by_discord_async(
self, user_dto: AuthUserDTO, dc_id: int
) -> OAuthDTO:
db_auth_user = self._auth_users.find_auth_user_by_email(user_dto.email)
# user exists
if db_auth_user is not None and db_auth_user.users.count() > 0:
# raise ServiceException(ServiceErrorCode.InvalidUser, 'User already exists')
self._logger.debug(__name__, f'Discord user already exists')
self._logger.debug(__name__, f"Discord user already exists")
return OAuthDTO(AUT.to_dto(db_auth_user), None)
# user exists but discord user id not set
elif db_auth_user is not None and db_auth_user.users.count() == 0:
self._logger.debug(__name__, f'Auth user exists but not linked with discord')
self._logger.debug(
__name__, f"Auth user exists but not linked with discord"
)
# users = self._users.get_users_by_discord_id(user_dto.user_id)
# add auth_user to user refs
db_auth_user.oauth_id = None
else:
# user does not exists
self._logger.debug(__name__, f'Auth user does not exist')
self._logger.debug(__name__, f"Auth user does not exist")
try:
user_dto.user_id = self._users.get_users_by_discord_id(user_dto.user_id).single().user_id
user_dto.user_id = (
self._users.get_users_by_discord_id(user_dto.user_id)
.single()
.user_id
)
except Exception as e:
self._logger.error(__name__, f'User not found')
self._logger.error(__name__, f"User not found")
user_dto.user_id = None
await self.add_auth_user_async(user_dto)
@ -303,7 +340,9 @@ class AuthService(AuthServiceABC):
for user in users:
if user.server.server_id != server.server_id:
continue
self._auth_users.add_auth_user_user_rel(AuthUserUsersRelation(db_auth_user, user))
self._auth_users.add_auth_user_user_rel(
AuthUserUsersRelation(db_auth_user, user)
)
self._auth_users.update_auth_user(db_auth_user)
self._db.save_changes()
@ -311,98 +350,167 @@ class AuthService(AuthServiceABC):
async def update_user_async(self, update_user_dto: UpdateAuthUserDTO):
if update_user_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'User is empty')
raise ServiceException(ServiceErrorCode.InvalidData, f"User is empty")
if update_user_dto.auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'Existing user is empty')
raise ServiceException(
ServiceErrorCode.InvalidData, f"Existing user is empty"
)
if update_user_dto.new_auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'New user is empty')
raise ServiceException(ServiceErrorCode.InvalidData, f"New user is empty")
if not self._is_email_valid(update_user_dto.auth_user.email) or not self._is_email_valid(update_user_dto.new_auth_user.email):
raise ServiceException(ServiceErrorCode.InvalidData, f'Invalid E-Mail')
if not self._is_email_valid(
update_user_dto.auth_user.email
) or not self._is_email_valid(update_user_dto.new_auth_user.email):
raise ServiceException(ServiceErrorCode.InvalidData, f"Invalid E-Mail")
user = self._auth_users.find_auth_user_by_email(update_user_dto.auth_user.email)
if user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User not found')
raise ServiceException(ServiceErrorCode.InvalidUser, "User not found")
if user.confirmation_id is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'E-Mail not confirmed')
raise ServiceException(ServiceErrorCode.InvalidUser, "E-Mail not confirmed")
# update first name
if update_user_dto.new_auth_user.first_name is not None and update_user_dto.auth_user.first_name != update_user_dto.new_auth_user.first_name:
if (
update_user_dto.new_auth_user.first_name is not None
and update_user_dto.auth_user.first_name
!= update_user_dto.new_auth_user.first_name
):
user.first_name = update_user_dto.new_auth_user.first_name
# update last name
if update_user_dto.new_auth_user.last_name is not None and update_user_dto.new_auth_user.last_name != '' and \
update_user_dto.auth_user.last_name != update_user_dto.new_auth_user.last_name:
if (
update_user_dto.new_auth_user.last_name is not None
and update_user_dto.new_auth_user.last_name != ""
and update_user_dto.auth_user.last_name
!= update_user_dto.new_auth_user.last_name
):
user.last_name = update_user_dto.new_auth_user.last_name
# update E-Mail
if update_user_dto.new_auth_user.email is not None and update_user_dto.new_auth_user.email != '' and update_user_dto.auth_user.email != update_user_dto.new_auth_user.email:
user_by_new_e_mail = self._auth_users.find_auth_user_by_email(update_user_dto.new_auth_user.email)
if (
update_user_dto.new_auth_user.email is not None
and update_user_dto.new_auth_user.email != ""
and update_user_dto.auth_user.email != update_user_dto.new_auth_user.email
):
user_by_new_e_mail = self._auth_users.find_auth_user_by_email(
update_user_dto.new_auth_user.email
)
if user_by_new_e_mail is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User already exists')
raise ServiceException(
ServiceErrorCode.InvalidUser, "User already exists"
)
user.email = update_user_dto.new_auth_user.email
update_user_dto.auth_user.password = self._hash_sha256(update_user_dto.auth_user.password, user.password_salt)
update_user_dto.auth_user.password = self._hash_sha256(
update_user_dto.auth_user.password, user.password_salt
)
if update_user_dto.auth_user.password != user.password:
raise ServiceException(ServiceErrorCode.InvalidUser, 'Wrong password')
raise ServiceException(ServiceErrorCode.InvalidUser, "Wrong password")
# update password
if update_user_dto.new_auth_user.password is not None and self._hash_sha256(update_user_dto.new_auth_user.password, user.password_salt) != user.password:
if (
update_user_dto.new_auth_user.password is not None
and self._hash_sha256(
update_user_dto.new_auth_user.password, user.password_salt
)
!= user.password
):
user.password_salt = uuid.uuid4()
user.password = self._hash_sha256(update_user_dto.new_auth_user.password, user.password_salt)
user.password = self._hash_sha256(
update_user_dto.new_auth_user.password, user.password_salt
)
self._auth_users.update_auth_user(user)
self._db.save_changes()
async def update_user_as_admin_async(self, update_user_dto: UpdateAuthUserDTO):
if update_user_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'User is empty')
raise ServiceException(ServiceErrorCode.InvalidData, f"User is empty")
if update_user_dto.auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'Existing user is empty')
raise ServiceException(
ServiceErrorCode.InvalidData, f"Existing user is empty"
)
if update_user_dto.new_auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'New user is empty')
raise ServiceException(ServiceErrorCode.InvalidData, f"New user is empty")
if not self._is_email_valid(update_user_dto.auth_user.email) or not self._is_email_valid(update_user_dto.new_auth_user.email):
raise ServiceException(ServiceErrorCode.InvalidData, f'Invalid E-Mail')
if not self._is_email_valid(
update_user_dto.auth_user.email
) or not self._is_email_valid(update_user_dto.new_auth_user.email):
raise ServiceException(ServiceErrorCode.InvalidData, f"Invalid E-Mail")
user = self._auth_users.find_auth_user_by_email(update_user_dto.auth_user.email)
if user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User not found')
raise ServiceException(ServiceErrorCode.InvalidUser, "User not found")
if user.confirmation_id is not None and update_user_dto.new_auth_user.is_confirmed:
if (
user.confirmation_id is not None
and update_user_dto.new_auth_user.is_confirmed
):
user.confirmation_id = None
elif user.confirmation_id is None and not update_user_dto.new_auth_user.is_confirmed:
elif (
user.confirmation_id is None
and not update_user_dto.new_auth_user.is_confirmed
):
user.confirmation_id = uuid.uuid4()
# else
# raise ServiceException(ServiceErrorCode.InvalidUser, 'E-Mail not confirmed')
# update first name
if update_user_dto.new_auth_user.first_name is not None and update_user_dto.auth_user.first_name != update_user_dto.new_auth_user.first_name:
if (
update_user_dto.new_auth_user.first_name is not None
and update_user_dto.auth_user.first_name
!= update_user_dto.new_auth_user.first_name
):
user.first_name = update_user_dto.new_auth_user.first_name
# update last name
if update_user_dto.new_auth_user.last_name is not None and update_user_dto.new_auth_user.last_name != '' and update_user_dto.auth_user.last_name != update_user_dto.new_auth_user.last_name:
if (
update_user_dto.new_auth_user.last_name is not None
and update_user_dto.new_auth_user.last_name != ""
and update_user_dto.auth_user.last_name
!= update_user_dto.new_auth_user.last_name
):
user.last_name = update_user_dto.new_auth_user.last_name
# update E-Mail
if update_user_dto.new_auth_user.email is not None and update_user_dto.new_auth_user.email != '' and update_user_dto.auth_user.email != update_user_dto.new_auth_user.email:
user_by_new_e_mail = self._auth_users.find_auth_user_by_email(update_user_dto.new_auth_user.email)
if (
update_user_dto.new_auth_user.email is not None
and update_user_dto.new_auth_user.email != ""
and update_user_dto.auth_user.email != update_user_dto.new_auth_user.email
):
user_by_new_e_mail = self._auth_users.find_auth_user_by_email(
update_user_dto.new_auth_user.email
)
if user_by_new_e_mail is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User already exists')
raise ServiceException(
ServiceErrorCode.InvalidUser, "User already exists"
)
user.email = update_user_dto.new_auth_user.email
# update password
if update_user_dto.new_auth_user.password is not None and update_user_dto.change_password and user.password != self._hash_sha256(update_user_dto.new_auth_user.password, user.password_salt):
if (
update_user_dto.new_auth_user.password is not None
and update_user_dto.change_password
and user.password
!= self._hash_sha256(
update_user_dto.new_auth_user.password, user.password_salt
)
):
user.password_salt = uuid.uuid4()
user.password = self._hash_sha256(update_user_dto.new_auth_user.password, user.password_salt)
user.password = self._hash_sha256(
update_user_dto.new_auth_user.password, user.password_salt
)
# update role
if user.auth_role == update_user_dto.auth_user.auth_role and user.auth_role != update_user_dto.new_auth_user.auth_role:
if (
user.auth_role == update_user_dto.auth_user.auth_role
and user.auth_role != update_user_dto.new_auth_user.auth_role
):
user.auth_role = update_user_dto.new_auth_user.auth_role
self._auth_users.update_auth_user(user)
@ -414,43 +522,48 @@ class AuthService(AuthServiceABC):
self._auth_users.delete_auth_user(user)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot delete user', e)
raise ServiceException(ServiceErrorCode.UnableToDelete, f'Cannot delete user by mail {email}')
self._logger.error(__name__, f"Cannot delete user", e)
raise ServiceException(
ServiceErrorCode.UnableToDelete, f"Cannot delete user by mail {email}"
)
async def delete_auth_user_async(self, user_dto: AuthUser):
try:
self._auth_users.delete_auth_user(AUT.to_db(user_dto))
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot delete user', e)
raise ServiceException(ServiceErrorCode.UnableToDelete, f'Cannot delete user by mail {user_dto.email}')
self._logger.error(__name__, f"Cannot delete user", e)
raise ServiceException(
ServiceErrorCode.UnableToDelete,
f"Cannot delete user by mail {user_dto.email}",
)
def verify_login(self, token_str: str) -> bool:
try:
token = self.decode_token(token_str)
if token is None or 'email' not in token:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token invalid')
if token is None or "email" not in token:
raise ServiceException(ServiceErrorCode.InvalidData, "Token invalid")
user = self._auth_users.find_auth_user_by_email(token['email'])
user = self._auth_users.find_auth_user_by_email(token["email"])
if user is None:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token expired')
raise ServiceException(ServiceErrorCode.InvalidData, "Token expired")
except Exception as e:
self._logger.error(__name__, f'Token invalid', e)
self._logger.error(__name__, f"Token invalid", e)
return False
return True
async def login_async(self, user_dto: AuthUser) -> TokenDTO:
if user_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, 'User not set')
raise ServiceException(ServiceErrorCode.InvalidData, "User not set")
db_user = self._auth_users.find_auth_user_by_email(user_dto.email)
if db_user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, f'User not found')
raise ServiceException(ServiceErrorCode.InvalidUser, f"User not found")
user_dto.password = self._hash_sha256(user_dto.password, db_user.password_salt)
if db_user.password != user_dto.password:
raise ServiceException(ServiceErrorCode.InvalidUser, 'Wrong password')
raise ServiceException(ServiceErrorCode.InvalidUser, "Wrong password")
token = self.generate_token(db_user)
refresh_token = self._create_and_save_refresh_token(db_user)
@ -462,7 +575,7 @@ class AuthService(AuthServiceABC):
async def login_discord_async(self, user_dto: AuthUserDTO) -> TokenDTO:
if user_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, 'User not set')
raise ServiceException(ServiceErrorCode.InvalidData, "User not set")
db_user = self._auth_users.find_auth_user_by_email(user_dto.email)
if db_user is None:
@ -480,38 +593,52 @@ class AuthService(AuthServiceABC):
async def refresh_async(self, token_dto: TokenDTO) -> TokenDTO:
if token_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'Token not set')
raise ServiceException(ServiceErrorCode.InvalidData, f"Token not set")
try:
token = self.decode_token(token_dto.token)
if token is None or 'email' not in token:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token invalid')
if token is None or "email" not in token:
raise ServiceException(ServiceErrorCode.InvalidData, "Token invalid")
user = self._auth_users.get_auth_user_by_email(token['email'])
if user is None or user.refresh_token != token_dto.refresh_token or user.refresh_token_expire_time <= datetime.now():
raise ServiceException(ServiceErrorCode.InvalidData, 'Token expired')
user = self._auth_users.get_auth_user_by_email(token["email"])
if (
user is None
or user.refresh_token != token_dto.refresh_token
or user.refresh_token_expire_time <= datetime.now()
):
raise ServiceException(ServiceErrorCode.InvalidData, "Token expired")
return TokenDTO(self.generate_token(user), self._create_and_save_refresh_token(user))
return TokenDTO(
self.generate_token(user), self._create_and_save_refresh_token(user)
)
except Exception as e:
self._logger.error(__name__, f'Refreshing token failed', e)
return TokenDTO('', '')
self._logger.error(__name__, f"Refreshing token failed", e)
return TokenDTO("", "")
async def revoke_async(self, token_dto: TokenDTO):
if token_dto is None or token_dto.token is None or token_dto.refresh_token is None:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token not set')
if (
token_dto is None
or token_dto.token is None
or token_dto.refresh_token is None
):
raise ServiceException(ServiceErrorCode.InvalidData, "Token not set")
try:
token = self.decode_token(token_dto.token)
user = self._auth_users.get_auth_user_by_email(token['email'])
if user is None or user.refresh_token != token_dto.refresh_token or user.refresh_token_expire_time <= datetime.now():
raise ServiceException(ServiceErrorCode.InvalidData, 'Token expired')
user = self._auth_users.get_auth_user_by_email(token["email"])
if (
user is None
or user.refresh_token != token_dto.refresh_token
or user.refresh_token_expire_time <= datetime.now()
):
raise ServiceException(ServiceErrorCode.InvalidData, "Token expired")
user.refresh_token = None
self._auth_users.update_auth_user(user)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Refreshing token failed', e)
self._logger.error(__name__, f"Refreshing token failed", e)
async def confirm_email_async(self, id: str) -> bool:
user = self._auth_users.find_auth_user_by_confirmation_id(id)
@ -540,13 +667,18 @@ class AuthService(AuthServiceABC):
async def reset_password_async(self, rp_dto: ResetPasswordDTO):
user = self._auth_users.find_auth_user_by_forgot_password_id(rp_dto.id)
if user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, f'User by forgot password id {rp_dto.id} not found')
raise ServiceException(
ServiceErrorCode.InvalidUser,
f"User by forgot password id {rp_dto.id} not found",
)
if user.confirmation_id is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, f'E-Mail not confirmed')
raise ServiceException(
ServiceErrorCode.InvalidUser, f"E-Mail not confirmed"
)
if user.password is None or rp_dto.password == '':
raise ServiceException(ServiceErrorCode.InvalidData, f'Password not set')
if user.password is None or rp_dto.password == "":
raise ServiceException(ServiceErrorCode.InvalidData, f"Password not set")
user.password_salt = uuid.uuid4()
user.password = self._hash_sha256(rp_dto.password, user.password_salt)

View File

@ -20,14 +20,13 @@ from bot_data.model.server import Server
class DiscordService:
def __init__(
self,
bot: DiscordBotServiceABC,
servers: ServerRepositoryABC,
auth: AuthServiceABC,
auth_users: AuthUserRepositoryABC,
users: UserRepositoryABC,
self,
bot: DiscordBotServiceABC,
servers: ServerRepositoryABC,
auth: AuthServiceABC,
auth_users: AuthUserRepositoryABC,
users: UserRepositoryABC,
):
self._bot = bot
self._servers = servers
@ -38,68 +37,70 @@ class DiscordService:
def _to_dto(self, x: Server) -> Optional[ServerDTO]:
guild = self._bot.get_guild(x.discord_server_id)
if guild is None:
return ServerTransformer.to_dto(
x,
'',
0,
None
)
return ServerTransformer.to_dto(x, "", 0, None)
return ServerTransformer.to_dto(
x,
guild.name,
guild.member_count,
guild.icon
)
return ServerTransformer.to_dto(x, guild.name, guild.member_count, guild.icon)
async def get_all_servers(self) -> List[ServerDTO]:
servers = List(ServerDTO, self._servers.get_servers())
return servers.select(self._to_dto).where(lambda x: x.name != '')
return servers.select(self._to_dto).where(lambda x: x.name != "")
async def get_all_servers_by_user(self) -> List[ServerDTO]:
token = self._auth.get_decoded_token_from_request()
if token is None or 'email' not in token or 'role' not in token:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token invalid')
if token is None or "email" not in token or "role" not in token:
raise ServiceException(ServiceErrorCode.InvalidData, "Token invalid")
role = AuthRoleEnum(token['role'])
role = AuthRoleEnum(token["role"])
servers = self._servers.get_servers()
if role != AuthRoleEnum.admin:
auth_user = self._auth_users.find_auth_user_by_email(token['email'])
auth_user = self._auth_users.find_auth_user_by_email(token["email"])
if auth_user is not None:
user_ids = auth_user.users.select(lambda x: x.server is not None and x.server.server_id)
user_ids = auth_user.users.select(
lambda x: x.server is not None and x.server.server_id
)
servers = servers.where(lambda x: x.server_id in user_ids)
servers = List(ServerDTO, servers)
return servers.select(self._to_dto).where(lambda x: x.name != '')
return servers.select(self._to_dto).where(lambda x: x.name != "")
async def get_filtered_servers_async(self, criteria: ServerSelectCriteria) -> ServerFilteredResultDTO:
async def get_filtered_servers_async(
self, criteria: ServerSelectCriteria
) -> ServerFilteredResultDTO:
token = self._auth.get_decoded_token_from_request()
if token is None or 'email' not in token or 'role' not in token:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token invalid')
if token is None or "email" not in token or "role" not in token:
raise ServiceException(ServiceErrorCode.InvalidData, "Token invalid")
role = AuthRoleEnum(token['role'])
role = AuthRoleEnum(token["role"])
filtered_result = self._servers.get_filtered_servers(criteria)
# filter out servers, where the user not exists
if role != AuthRoleEnum.admin:
auth_user = self._auth_users.find_auth_user_by_email(token['email'])
auth_user = self._auth_users.find_auth_user_by_email(token["email"])
if auth_user is not None:
user_ids = auth_user.users.select(lambda x: x.server is not None and x.server.server_id)
filtered_result.result = filtered_result.result.where(lambda x: x.server_id in user_ids)
user_ids = auth_user.users.select(
lambda x: x.server is not None and x.server.server_id
)
filtered_result.result = filtered_result.result.where(
lambda x: x.server_id in user_ids
)
servers: List = filtered_result.result.select(self._to_dto).where(lambda x: x.name != '')
servers: List = filtered_result.result.select(self._to_dto).where(
lambda x: x.name != ""
)
result = List(ServerDTO, servers)
if criteria.name is not None and criteria.name != '':
result = result.where(lambda x: criteria.name.lower() in x.name.lower() or x.name.lower() == criteria.name.lower())
if criteria.name is not None and criteria.name != "":
result = result.where(
lambda x: criteria.name.lower() in x.name.lower()
or x.name.lower() == criteria.name.lower()
)
return ServerFilteredResultDTO(
List(ServerDTO, result),
servers.count()
)
return ServerFilteredResultDTO(List(ServerDTO, result), servers.count())
async def get_server_by_id_async(self, id: int) -> ServerDTO:
server = self._servers.get_server_by_id(id)
guild = self._bot.get_guild(server.discord_server_id)
server_dto = ServerTransformer.to_dto(server, guild.name, guild.member_count, guild.icon)
server_dto = ServerTransformer.to_dto(
server, guild.name, guild.member_count, guild.icon
)
return server_dto

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_api.transformer'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_api.transformer"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -7,7 +7,6 @@ from bot_data.model.auth_user import AuthUser
class AuthUserTransformer(TransformerABC):
@staticmethod
def to_db(dto: AuthUserDTO) -> AuthUser:
return AuthUser(
@ -21,8 +20,10 @@ class AuthUserTransformer(TransformerABC):
None,
None,
datetime.now(),
AuthRoleEnum.normal if dto.auth_role is None else AuthRoleEnum(dto.auth_role),
auth_user_id=0 if dto.id is None else dto.id
AuthRoleEnum.normal
if dto.auth_role is None
else AuthRoleEnum(dto.auth_role),
auth_user_id=0 if dto.id is None else dto.id,
)
@staticmethod
@ -32,7 +33,7 @@ class AuthUserTransformer(TransformerABC):
db.first_name,
db.last_name,
db.email,
'' if password is None else password,
"" if password is None else password,
db.confirmation_id,
db.auth_role
db.auth_role,
)

View File

@ -8,13 +8,14 @@ from bot_data.model.server import Server
class ServerTransformer(TransformerABC):
@staticmethod
def to_db(dto: ServerDTO) -> Server:
return Server(dto.discord_id)
@staticmethod
def to_dto(db: Server, name: str, member_count: int, icon_url: Optional[discord.Asset]) -> ServerDTO:
def to_dto(
db: Server, name: str, member_count: int, icon_url: Optional[discord.Asset]
) -> ServerDTO:
return ServerDTO(
db.server_id,
db.discord_server_id,

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.abc'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.abc"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -10,39 +10,54 @@ from modules.base.configuration.base_server_settings import BaseServerSettings
class ClientUtilsABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def __init__(self): pass
def received_command(self, guild_id: int):
pass
@abstractmethod
def received_command(self, guild_id: int): pass
def moved_user(self, guild_id: int):
pass
@abstractmethod
def moved_user(self, guild_id: int): pass
def moved_users(self, guild_id: int, count: int):
pass
@abstractmethod
def moved_users(self, guild_id: int, count: int): pass
def get_client(self, dc_ic: int, guild_id: int):
pass
@abstractmethod
def get_client(self, dc_ic: int, guild_id: int): pass
async def check_if_bot_is_ready_yet(self) -> bool:
pass
@abstractmethod
async def check_if_bot_is_ready_yet(self) -> bool: pass
async def check_if_bot_is_ready_yet_and_respond(self, ctx: Context) -> bool:
pass
@abstractmethod
async def check_if_bot_is_ready_yet_and_respond(self, ctx: Context) -> bool: pass
async def presence_game(self, t_key: str):
pass
@abstractmethod
async def presence_game(self, t_key: str): pass
@abstractmethod
def get_auto_complete_list(self, _l: List, current: str, select: Callable = None) -> List: pass
def get_auto_complete_list(
self, _l: List, current: str, select: Callable = None
) -> List:
pass
@abstractmethod
def is_message_xp_count_by_hour_higher_that_max_message_count_per_hour(
self, created_at: datetime, user: User, settings: BaseServerSettings,
is_reaction: bool = False
) -> bool: pass
self,
created_at: datetime,
user: User,
settings: BaseServerSettings,
is_reaction: bool = False,
) -> bool:
pass
@abstractmethod
def get_ontime_for_user(self, user: User) -> float: pass
def get_ontime_for_user(self, user: User) -> float:
pass

View File

@ -9,11 +9,18 @@ from bot_core.configuration.file_logging_settings import FileLoggingSettings
class CustomFileLoggerABC(Logger, ABC):
@abstractmethod
def __init__(self, key: str, config: ConfigurationABC, time_format: TimeFormatSettings, env: ApplicationEnvironmentABC):
def __init__(
self,
key: str,
config: ConfigurationABC,
time_format: TimeFormatSettings,
env: ApplicationEnvironmentABC,
):
self._key = key
self._settings: LoggingSettings = config.get_configuration(f'{FileLoggingSettings.__name__}_{key}')
self._settings: LoggingSettings = config.get_configuration(
f"{FileLoggingSettings.__name__}_{key}"
)
Logger.__init__(self, self._settings, time_format, env)
self._begin_log()
@ -24,41 +31,43 @@ class CustomFileLoggerABC(Logger, ABC):
def _begin_log(self):
console_level = self._console.value
self._console = LoggingLevelEnum.OFF
self.info(__name__, f'Starting...')
self.info(__name__, f"Starting...")
self._console = LoggingLevelEnum(console_level)
def _get_string(self, name_list_as_str: str, level: LoggingLevelEnum, message: str) -> str:
names = name_list_as_str.split(' ')
def _get_string(
self, name_list_as_str: str, level: LoggingLevelEnum, message: str
) -> str:
names = name_list_as_str.split(" ")
log_level = level.name
string = f'<{self._get_datetime_now()}> [ {log_level} ]'
string = f"<{self._get_datetime_now()}> [ {log_level} ]"
for name in names:
string += f' [ {name} ]'
string += f': {message}'
string += f" [ {name} ]"
string += f": {message}"
return string
def header(self, string: str):
super().header(string)
def trace(self, name: str, message: str):
name = f'{name} {self._key}'
name = f"{name} {self._key}"
super().trace(name, message)
def debug(self, name: str, message: str):
name = f'{name} {self._key}'
name = f"{name} {self._key}"
super().debug(name, message)
def info(self, name: str, message: str):
name = f'{name} {self._key}'
name = f"{name} {self._key}"
super().info(name, message)
def warn(self, name: str, message: str):
name = f'{name} {self._key}'
name = f"{name} {self._key}"
super().warn(name, message)
def error(self, name: str, message: str, ex: Exception = None):
name = f'{name} {self._key}'
name = f"{name} {self._key}"
super().error(name, message, ex)
def fatal(self, name: str, message: str, ex: Exception = None):
name = f'{name} {self._key}'
name = f"{name} {self._key}"
super().fatal(name, message, ex)

View File

@ -8,35 +8,60 @@ from discord.ext.commands import Context
class MessageServiceABC(ABC):
@abstractmethod
def __init__(self):
pass
@abstractmethod
def __init__(self): pass
async def delete_messages(
self, messages: List[discord.Message], guild_id: int, without_tracking=False
):
pass
@abstractmethod
async def delete_messages(self, messages: List[discord.Message], guild_id: int, without_tracking=False): pass
@abstractmethod
async def delete_message(self, message: discord.Message, without_tracking=False): pass
async def delete_message(self, message: discord.Message, without_tracking=False):
pass
@abstractmethod
async def send_channel_message(
self, channel: discord.TextChannel, message: Union[str, discord.Embed], without_tracking=True
): pass
self,
channel: discord.TextChannel,
message: Union[str, discord.Embed],
without_tracking=True,
):
pass
@abstractmethod
async def send_dm_message(
self, message: Union[str, discord.Embed], receiver: Union[discord.User, discord.Member],
without_tracking=False
): pass
self,
message: Union[str, discord.Embed],
receiver: Union[discord.User, discord.Member],
without_tracking=False,
):
pass
@abstractmethod
async def send_ctx_msg(
self, ctx: Context, message: Union[str, discord.Embed], file: discord.File = None,
is_persistent: bool = False, is_public: bool = False, wait_before_delete: int = None, without_tracking=True
) -> Optional[discord.Message]: pass
self,
ctx: Context,
message: Union[str, discord.Embed],
file: discord.File = None,
is_persistent: bool = False,
is_public: bool = False,
wait_before_delete: int = None,
without_tracking=True,
) -> Optional[discord.Message]:
pass
@abstractmethod
async def send_interaction_msg(
self, interaction: Interaction, message: Union[str, discord.Embed], is_persistent: bool = False,
is_public: bool = False, wait_before_delete: int = None, without_tracking=True, **kwargs
): pass
self,
interaction: Interaction,
message: Union[str, discord.Embed],
is_persistent: bool = False,
is_public: bool = False,
wait_before_delete: int = None,
without_tracking=True,
**kwargs
):
pass

View File

@ -7,7 +7,6 @@ from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
class ModuleABC(StartupExtensionABC):
@abstractmethod
def __init__(self, dc: DiscordCollectionABC, feature_flag: FeatureFlagsEnum):
StartupExtensionABC.__init__(self)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.configuration'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.configuration"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -8,7 +8,6 @@ from bot_core.configuration.file_logging_settings import FileLoggingSettings
class BotLoggingSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._files: List[FileLoggingSettings] = List(FileLoggingSettings)
@ -22,12 +21,16 @@ class BotLoggingSettings(ConfigurationModelABC):
files = List(FileLoggingSettings)
for s in settings:
st = FileLoggingSettings()
settings[s]['Key'] = s
settings[s]["Key"] = s
st.from_dict(settings[s])
files.append(st)
self._files = files
except Exception as e:
Console.set_foreground_color(ForegroundColorEnum.red)
Console.write_line(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.write_line(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.write_line(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.write_line(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)
Console.set_foreground_color(ForegroundColorEnum.default)

View File

@ -8,7 +8,6 @@ from bot_core.configuration.server_settings import ServerSettings
class BotSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
@ -17,7 +16,7 @@ class BotSettings(ConfigurationModelABC):
self._wait_for_restart = 2
self._wait_for_shutdown = 2
self._cache_max_messages = 1000
@property
def servers(self) -> List[ServerSettings]:
return self._servers
@ -47,7 +46,7 @@ class BotSettings(ConfigurationModelABC):
settings.pop("WaitForRestart")
settings.pop("WaitForShutdown")
if 'CacheMaxMessages' in settings:
if "CacheMaxMessages" in settings:
self._cache_max_messages = settings["CacheMaxMessages"]
settings.pop("CacheMaxMessages")
@ -59,5 +58,9 @@ class BotSettings(ConfigurationModelABC):
servers.append(st)
self._servers = servers
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -3,19 +3,19 @@ from enum import Enum
class FeatureFlagsEnum(Enum):
# modules
api_module = 'ApiModule'
admin_module = 'AdminModule'
auto_role_module = 'AutoRoleModule'
base_module = 'BaseModule'
boot_log_module = 'BootLogModule'
core_module = 'CoreModule'
core_extension_module = 'CoreExtensionModule'
data_module = 'DataModule',
database_module = 'DatabaseModule',
level_module = 'LevelModule'
moderator_module = 'ModeratorModule'
permission_module = 'PermissionModule'
stats_module = 'StatsModule'
api_module = "ApiModule"
admin_module = "AdminModule"
auto_role_module = "AutoRoleModule"
base_module = "BaseModule"
boot_log_module = "BootLogModule"
core_module = "CoreModule"
core_extension_module = "CoreExtensionModule"
data_module = ("DataModule",)
database_module = ("DatabaseModule",)
level_module = "LevelModule"
moderator_module = "ModeratorModule"
permission_module = "PermissionModule"
stats_module = "StatsModule"
# features
api_only = 'ApiOnly'
presence = 'Presence'
api_only = "ApiOnly"
presence = "Presence"

View File

@ -8,7 +8,6 @@ from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
class FeatureFlagsSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
@ -47,5 +46,9 @@ class FeatureFlagsSettings(ConfigurationModelABC):
for flag in [f.value for f in FeatureFlagsEnum]:
self._load_flag(settings, FeatureFlagsEnum(flag))
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -5,11 +5,10 @@ from cpl_core.logging import LoggingSettings
class FileLoggingSettings(LoggingSettings):
def __init__(self):
LoggingSettings.__init__(self)
self._key = ''
self._key = ""
@property
def key(self) -> str:
@ -17,8 +16,12 @@ class FileLoggingSettings(LoggingSettings):
def from_dict(self, settings: dict):
try:
self._key = settings['Key']
self._key = settings["Key"]
super().from_dict(settings)
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(
f"[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings"
)
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -5,7 +5,6 @@ from cpl_core.console import Console
class ServerSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
@ -22,8 +21,10 @@ class ServerSettings(ConfigurationModelABC):
def from_dict(self, settings: dict):
try:
self._id = int(settings['Id'])
self._message_delete_timer = int(settings['MessageDeleteTimer'])
self._id = int(settings["Id"])
self._message_delete_timer = int(settings["MessageDeleteTimer"])
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')
Console.error(f"[ ERROR ] [ {__name__} ]: Reading error in settings")
Console.error(
f"[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}"
)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.core_extension'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.core_extension"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -13,12 +13,13 @@ from modules.permission.abc.permission_service_abc import PermissionServiceABC
class CoreExtension(ApplicationExtensionABC):
def __init__(self):
ApplicationExtensionABC.__init__(self)
async def run(self, config: ConfigurationABC, services: ServiceProviderABC):
feature_flags: FeatureFlagsSettings = config.get_configuration(FeatureFlagsSettings)
feature_flags: FeatureFlagsSettings = config.get_configuration(
FeatureFlagsSettings
)
if not feature_flags.get_flag(FeatureFlagsEnum.core_module):
return

View File

@ -6,16 +6,23 @@ from cpl_discord.service.discord_collection_abc import DiscordCollectionABC
from bot_core.abc.module_abc import ModuleABC
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
from bot_core.core_extension.core_extension_on_ready_event import CoreExtensionOnReadyEvent
from bot_core.core_extension.core_extension_on_ready_event import (
CoreExtensionOnReadyEvent,
)
class CoreExtensionModule(ModuleABC):
def __init__(self, dc: DiscordCollectionABC):
ModuleABC.__init__(self, dc, FeatureFlagsEnum.core_extension_module)
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
def configure_configuration(
self, config: ConfigurationABC, env: ApplicationEnvironmentABC
):
pass
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
self._dc.add_event(DiscordEventTypesEnum.on_ready.value, CoreExtensionOnReadyEvent)
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
self._dc.add_event(
DiscordEventTypesEnum.on_ready.value, CoreExtensionOnReadyEvent
)

View File

@ -9,13 +9,12 @@ from bot_core.abc.client_utils_abc import ClientUtilsABC
class CoreExtensionOnReadyEvent(OnReadyABC):
def __init__(
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
t: TranslatePipe
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
t: TranslatePipe,
):
OnReadyABC.__init__(self)
@ -24,9 +23,9 @@ class CoreExtensionOnReadyEvent(OnReadyABC):
self._client_utils = client_utils
self._t = t
self._logger.info(__name__, f'Module {type(self)} loaded')
self._logger.info(__name__, f"Module {type(self)} loaded")
async def on_ready(self):
self._logger.debug(__name__, f'Module {type(self)} started')
await self._client_utils.presence_game('common.presence.running')
self._logger.trace(__name__, f'Module {type(self)} stopped')
self._logger.debug(__name__, f"Module {type(self)} started")
await self._client_utils.presence_game("common.presence.running")
self._logger.trace(__name__, f"Module {type(self)} stopped")

View File

@ -15,14 +15,17 @@ from bot_core.service.message_service import MessageService
class CoreModule(ModuleABC):
def __init__(self, dc: DiscordCollectionABC):
ModuleABC.__init__(self, dc, FeatureFlagsEnum.core_module)
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
def configure_configuration(
self, config: ConfigurationABC, env: ApplicationEnvironmentABC
):
pass
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
def configure_services(
self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC
):
services.add_transient(MessageServiceABC, MessageService)
services.add_transient(ClientUtilsABC, ClientUtilsService)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.events'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.events"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -7,13 +7,12 @@ from bot_core.abc.client_utils_abc import ClientUtilsABC
class CoreOnReadyEvent(OnReadyABC):
def __init__(
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
t: TranslatePipe,
self,
logger: LoggerABC,
bot: DiscordBotServiceABC,
client_utils: ClientUtilsABC,
t: TranslatePipe,
):
OnReadyABC.__init__(self)
@ -22,9 +21,9 @@ class CoreOnReadyEvent(OnReadyABC):
self._client_utils = client_utils
self._t = t
self._logger.info(__name__, f'Module {type(self)} loaded')
self._logger.info(__name__, f"Module {type(self)} loaded")
async def on_ready(self):
self._logger.debug(__name__, f'Module {type(self)} started')
await self._client_utils.presence_game('common.presence.booting')
self._logger.trace(__name__, f'Module {type(self)} stopped')
self._logger.debug(__name__, f"Module {type(self)} started")
await self._client_utils.presence_game("common.presence.booting")
self._logger.trace(__name__, f"Module {type(self)} stopped")

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.exception'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.exception"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -2,6 +2,5 @@ from discord.ext.commands import CommandError
class CheckError(CommandError):
def __init__(self, message, *args):
CommandError.__init__(self, message, *args)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.helper'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.helper"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -18,11 +18,11 @@ class CommandChecks:
@classmethod
def init(
cls,
permissions: PermissionServiceABC,
client_utils: ClientUtilsABC,
message_service: MessageServiceABC,
translate: TranslatePipe,
cls,
permissions: PermissionServiceABC,
client_utils: ClientUtilsABC,
message_service: MessageServiceABC,
translate: TranslatePipe,
):
cls._permissions = permissions
cls._client_utils = client_utils
@ -34,7 +34,7 @@ class CommandChecks:
async def check_if_bot_is_ready_yet_and_respond(ctx: Context) -> bool:
result = await cls._client_utils.check_if_bot_is_ready_yet_and_respond(ctx)
if not result:
raise CheckError(f'Bot is not ready')
raise CheckError(f"Bot is not ready")
return result
return commands.check(check_if_bot_is_ready_yet_and_respond)
@ -44,8 +44,10 @@ class CommandChecks:
async def check_is_member_admin(ctx: Context):
has_permission = cls._permissions.is_member_admin(ctx.author)
if not has_permission:
await cls._message_service.send_ctx_msg(ctx, cls._t.transform('common.no_permission_message'))
raise CheckError(f'Member {ctx.author.name} is not admin')
await cls._message_service.send_ctx_msg(
ctx, cls._t.transform("common.no_permission_message")
)
raise CheckError(f"Member {ctx.author.name} is not admin")
return has_permission
@ -56,8 +58,10 @@ class CommandChecks:
async def check_is_member_technician(ctx: Context):
has_permission = cls._permissions.is_member_technician(ctx.author)
if not has_permission:
await cls._message_service.send_ctx_msg(ctx, cls._t.transform('common.no_permission_message'))
raise CheckError(f'Member {ctx.author.name} is not technician')
await cls._message_service.send_ctx_msg(
ctx, cls._t.transform("common.no_permission_message")
)
raise CheckError(f"Member {ctx.author.name} is not technician")
return has_permission
@ -68,8 +72,10 @@ class CommandChecks:
async def check_is_member_moderator(ctx: Context):
has_permission = cls._permissions.is_member_moderator(ctx.author)
if not has_permission:
await cls._message_service.send_ctx_msg(ctx, cls._t.transform('common.no_permission_message'))
raise CheckError(f'Member {ctx.author.name} is not moderator')
await cls._message_service.send_ctx_msg(
ctx, cls._t.transform("common.no_permission_message")
)
raise CheckError(f"Member {ctx.author.name} is not moderator")
return has_permission

View File

@ -15,8 +15,8 @@ class EventChecks:
@classmethod
def init(
cls,
client_utils: ClientUtilsABC,
cls,
client_utils: ClientUtilsABC,
):
cls._client_utils = client_utils
@ -25,7 +25,7 @@ class EventChecks:
async def check_if_bot_is_ready() -> bool:
result = await cls._client_utils.check_if_bot_is_ready()
if not result:
raise CheckError(f'Bot is not ready')
raise CheckError(f"Bot is not ready")
return result
return commands.check(check_if_bot_is_ready)

View File

@ -2,8 +2,7 @@ import discord
class LogMessageHelper:
@staticmethod
def get_log_string(message: discord.Message):
content = message.content.replace("\n", "\n\t")
return f'{message.author} @ {message.channel} -> \n\t{content}'
return f"{message.author} @ {message.channel} -> \n\t{content}"

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.logging'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.logging"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -6,6 +6,10 @@ from bot_core.abc.custom_file_logger_abc import CustomFileLoggerABC
class CommandLogger(CustomFileLoggerABC):
def __init__(self, config: ConfigurationABC, time_format: TimeFormatSettings, env: ApplicationEnvironmentABC):
CustomFileLoggerABC.__init__(self, 'Command', config, time_format, env)
def __init__(
self,
config: ConfigurationABC,
time_format: TimeFormatSettings,
env: ApplicationEnvironmentABC,
):
CustomFileLoggerABC.__init__(self, "Command", config, time_format, env)

View File

@ -6,6 +6,10 @@ from bot_core.abc.custom_file_logger_abc import CustomFileLoggerABC
class DatabaseLogger(CustomFileLoggerABC):
def __init__(self, config: ConfigurationABC, time_format: TimeFormatSettings, env: ApplicationEnvironmentABC):
CustomFileLoggerABC.__init__(self, 'Database', config, time_format, env)
def __init__(
self,
config: ConfigurationABC,
time_format: TimeFormatSettings,
env: ApplicationEnvironmentABC,
):
CustomFileLoggerABC.__init__(self, "Database", config, time_format, env)

View File

@ -6,6 +6,10 @@ from bot_core.abc.custom_file_logger_abc import CustomFileLoggerABC
class MessageLogger(CustomFileLoggerABC):
def __init__(self, config: ConfigurationABC, time_format: TimeFormatSettings, env: ApplicationEnvironmentABC):
CustomFileLoggerABC.__init__(self, 'Message', config, time_format, env)
def __init__(
self,
config: ConfigurationABC,
time_format: TimeFormatSettings,
env: ApplicationEnvironmentABC,
):
CustomFileLoggerABC.__init__(self, "Message", config, time_format, env)

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.pipes'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.pipes"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

View File

@ -4,8 +4,8 @@ from cpl_core.pipes.pipe_abc import PipeABC
class DateTimeOffsetPipe(PipeABC):
def __init__(self): pass
def __init__(self):
pass
def transform(self, value: datetime, *args):
return value.strftime('%Y-%m-%d %H:%M')
return value.strftime("%Y-%m-%d %H:%M")

View File

@ -11,16 +11,16 @@ Discord bot for the Keksdose discord Server
"""
__title__ = 'bot_core.service'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.3.0'
__title__ = "bot_core.service"
__author__ = "Sven Heidemann"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022 sh-edraft.de"
__version__ = "0.3.0"
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='3', micro='0')
VersionInfo = namedtuple("VersionInfo", "major minor micro")
version_info = VersionInfo(major="0", minor="3", micro="0")

Some files were not shown because too many files have changed in this diff Show More