Moved bot to kdb-bot #70

This commit is contained in:
2022-10-15 23:17:45 +02:00
parent 029b46d7de
commit ead3f69a69
216 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.abc'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,65 @@
from abc import ABC, abstractmethod
from cpl_query.extension import List
from bot_api.filter.auth_user_select_criteria import AuthUserSelectCriteria
from bot_api.model.auth_user_dto import AuthUserDTO
from bot_api.model.auth_user_filtered_result_dto import AuthUserFilteredResultDTO
from bot_api.model.email_string_dto import EMailStringDTO
from bot_api.model.reset_password_dto import ResetPasswordDTO
from bot_api.model.token_dto import TokenDTO
from bot_api.model.update_auth_user_dto import UpdateAuthUserDTO
class AuthServiceABC(ABC):
@abstractmethod
def __init__(self): pass
@abstractmethod
async def get_all_auth_users_async(self) -> List[AuthUserDTO]: pass
@abstractmethod
async def get_filtered_auth_users_async(self, criteria: AuthUserSelectCriteria) -> AuthUserFilteredResultDTO: pass
@abstractmethod
async def get_auth_user_by_email_async(self, email: str) -> AuthUserDTO: pass
@abstractmethod
async def find_auth_user_by_email_async(self, email: str) -> AuthUserDTO: pass
@abstractmethod
async def add_auth_user_async(self, user_dto: AuthUserDTO) -> int: pass
@abstractmethod
async def update_user_async(self, update_user_dto: UpdateAuthUserDTO): pass
@abstractmethod
async def update_user_as_admin_async(self, update_user_dto: UpdateAuthUserDTO): pass
@abstractmethod
async def delete_auth_user_by_email_async(self, email: str): pass
@abstractmethod
async def delete_auth_user_async(self, user_dto: AuthUserDTO): pass
@abstractmethod
async def login_async(self, user_dto: AuthUserDTO) -> TokenDTO: pass
@abstractmethod
async def refresh_async(self, token_dto: TokenDTO) -> TokenDTO: pass
@abstractmethod
async def revoke_async(self, token_dto: TokenDTO): pass
@abstractmethod
async def confirm_email_async(self, id: str) -> bool: pass
@abstractmethod
async def forgot_password_async(self, email: str): pass
@abstractmethod
async def confirm_forgot_password_async(self, id: str) -> EMailStringDTO: pass
@abstractmethod
async def reset_password_async(self, rp_dto: ResetPasswordDTO): pass

View File

@@ -0,0 +1,16 @@
from abc import abstractmethod
from cpl_core.database import TableABC
from bot_api.abc.dto_abc import DtoABC
class AuthUserTransformerABC:
@staticmethod
@abstractmethod
def to_db(dto: DtoABC) -> TableABC: pass
@staticmethod
@abstractmethod
def to_dto(db: TableABC) -> DtoABC: pass

View File

@@ -0,0 +1,13 @@
from abc import ABC, abstractmethod
class DtoABC(ABC):
@abstractmethod
def __init__(self): pass
@abstractmethod
def from_dict(self, values: dict): pass
@abstractmethod
def to_dict(self) -> dict: pass

View File

@@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
class SelectCriteriaABC(ABC):
@abstractmethod
def __init__(
self,
page_index: int,
page_size: int,
sort_direction: str,
sort_column: str
):
self.page_index = page_index
self.page_size = page_size
self.sort_direction = sort_direction
self.sort_column = sort_column

View File

@@ -0,0 +1,81 @@
import json
import sys
import uuid
from functools import partial
from cpl_core.dependency_injection import ServiceProviderABC
from flask import Flask, request, jsonify, Response, make_response
from flask_cors import CORS
from bot_api.configuration.api_settings import ApiSettings
from bot_api.exception.service_exception import ServiceException
from bot_api.logging.api_logger import ApiLogger
from bot_api.model.error_dto import ErrorDTO
from bot_api.route.route import Route
class Api(Flask):
def __init__(
self,
logger: ApiLogger,
services: ServiceProviderABC,
api_settings: ApiSettings,
*args, **kwargs
):
if not args:
kwargs.setdefault('import_name', __name__)
Flask.__init__(self, *args, **kwargs)
self._logger = logger
self._services = services
self._apt_settings = api_settings
self._cors = CORS(self, support_credentials=True)
# register before request
self.before_request_funcs.setdefault(None, []).append(self.before_request)
exc_class, code = self._get_exc_class_and_code(Exception)
self.error_handler_spec[None][code][exc_class] = self.handle_exception
def _register_routes(self):
for path, f in Route.registered_routes.items():
route = f[0]
kwargs = f[1]
cls = None
qual_name_split = route.__qualname__.split('.')
if len(qual_name_split) > 0:
cls_type = vars(sys.modules[route.__module__])[qual_name_split[0]]
cls = self._services.get_service(cls_type)
partial_f = partial(route, self if cls is None else cls)
partial_f.__name__ = route.__name__
self.route(path, **kwargs)(partial_f)
def handle_exception(self, e: Exception):
self._logger.error(__name__, f'Caught error', e)
if isinstance(e, ServiceException):
ex: ServiceException = e
self._logger.error(__name__, ex.get_detailed_message())
error = ErrorDTO(ex.error_code, ex.message)
return jsonify(error.to_dict()), 500
else:
tracking_id = uuid.uuid4()
user_message = f'Tracking Id: {tracking_id}'
self._logger.error(__name__, user_message, e)
error = ErrorDTO(None, user_message)
return jsonify(error.to_dict()), 400
def before_request(self, *args, **kwargs):
self._logger.debug(__name__, f'Received GET @{request.url}')
headers = str(request.headers).replace("\n", "\n\t")
self._logger.trace(__name__, f'Headers: \n\t{headers}')
def start(self):
self._logger.info(__name__, f'Starting API {self._apt_settings.host}:{self._apt_settings.port}')
self._register_routes()
from waitress import serve
# https://docs.pylonsproject.org/projects/waitress/en/stable/arguments.html
serve(self, host=self._apt_settings.host, port=self._apt_settings.port, threads=10, connection_limit=1000, channel_timeout=10)

View File

@@ -0,0 +1,41 @@
import os
from cpl_core.configuration import ConfigurationABC
from cpl_core.dependency_injection import ServiceCollectionABC
from cpl_core.environment import ApplicationEnvironmentABC
from cpl_core.mailing import EMailClientABC, EMailClient
from cpl_discord.service.discord_collection_abc import DiscordCollectionABC
from flask import Flask
from bot_api.abc.auth_service_abc import AuthServiceABC
from bot_api.api import Api
from bot_api.api_thread import ApiThread
from bot_api.controller.gui_controller import GuiController
from bot_api.controller.auth_controller import AuthController
from bot_api.service.auth_service import AuthService
from bot_core.abc.module_abc import ModuleABC
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
class ApiModule(ModuleABC):
def __init__(self, dc: DiscordCollectionABC):
ModuleABC.__init__(self, dc, FeatureFlagsEnum.api_module)
def configure_configuration(self, config: ConfigurationABC, env: ApplicationEnvironmentABC):
cwd = env.working_directory
env.set_working_directory(os.path.dirname(os.path.realpath(__file__)))
config.add_json_file(f'config/apisettings.json', optional=False)
config.add_json_file(f'config/apisettings.{env.environment_name}.json', optional=True)
config.add_json_file(f'config/apisettings.{env.host_name}.json', optional=True)
env.set_working_directory(cwd)
def configure_services(self, services: ServiceCollectionABC, env: ApplicationEnvironmentABC):
services.add_singleton(EMailClientABC, EMailClient)
services.add_singleton(ApiThread)
services.add_singleton(Flask, Api)
services.add_transient(AuthServiceABC, AuthService)
services.add_transient(AuthController)
services.add_transient(GuiController)

View File

@@ -0,0 +1,27 @@
import threading
from bot_api.api import Api
from bot_api.logging.api_logger import ApiLogger
from bot_core.configuration.feature_flags_enum import FeatureFlagsEnum
from bot_core.configuration.feature_flags_settings import FeatureFlagsSettings
class ApiThread(threading.Thread):
def __init__(
self,
logger: ApiLogger,
api: Api,
feature_flags: FeatureFlagsSettings
):
threading.Thread.__init__(self, daemon=True)
self._logger = logger
self._api = api
def run(self) -> None:
try:
self._logger.trace(__name__, f'Try to start {type(self._api).__name__}')
self._api.start()
except Exception as e:
self._logger.error(__name__, 'Start failed', e)

View File

@@ -0,0 +1,52 @@
{
"ProjectSettings": {
"Name": "bot-api",
"Version": {
"Major": "0",
"Minor": "0",
"Micro": "0"
},
"Author": "",
"AuthorEmail": "",
"Description": "",
"LongDescription": "",
"URL": "",
"CopyrightDate": "",
"CopyrightName": "",
"LicenseName": "",
"LicenseDescription": "",
"Dependencies": [
"cpl-core==2022.10.0.post6",
"Flask==2.2.2",
"Flask[async]==2.2.2",
"Flask-Classful==0.14.2",
"Flask-Cors==3.0.10",
"PyJWT[crypto]==2.5.0",
"PyJWT==2.5.0"
],
"DevDependencies": [
"cpl-cli==2022.10.0"
],
"PythonVersion": ">=3.10.4",
"PythonPath": {
"linux": ""
},
"Classifiers": []
},
"BuildSettings": {
"ProjectType": "library",
"SourcePath": "",
"OutputPath": "../../dist",
"Main": "bot_api.main",
"EntryPoint": "bot-api",
"IncludePackageData": false,
"Included": [],
"Excluded": [
"*/__pycache__",
"*/logs",
"*/tests"
],
"PackageData": {},
"ProjectReferences": []
}
}

View File

@@ -0,0 +1,8 @@
{
"EMailClientSettings": {
"Host": "mail.sh-edraft.de",
"Port": "587",
"UserName": "dev-srv@sh-edraft.de",
"Credentials": "RmBOQX1eNFYiYjgsSid3fV1nelc2WA=="
}
}

View File

@@ -0,0 +1,17 @@
{
"Api": {
"Port": 5000,
"Host": "0.0.0.0",
"RedirectToHTTPS": false
},
"Authentication": {
"SecretKey": "F3b5LDz+#Jvzg=W!@gsa%xsF",
"Issuer": "http://localhost:5000",
"Audience": "http://localhost:4200",
"TokenExpireTime": 1,
"RefreshTokenExpireTime": 7
},
"Frontend": {
"URL": "http://localhost:4200/"
}
}

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.configuration'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,35 @@
import traceback
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
class ApiSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._port = 80
self._host = ''
self._redirect_to_https = False
@property
def port(self) -> int:
return self._port
@property
def host(self) -> str:
return self._host
@property
def redirect_to_https(self) -> bool:
return self._redirect_to_https
def from_dict(self, settings: dict):
try:
self._port = int(settings['Port'])
self._host = settings['Host']
self._redirect_to_https = bool(settings['RedirectToHTTPS'])
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1,48 @@
import traceback
from datetime import datetime
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
class AuthenticationSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._secret_key = ''
self._issuer = ''
self._audience = ''
self._token_expire_time = 0
self._refresh_token_expire_time = 0
@property
def secret_key(self) -> str:
return self._secret_key
@property
def issuer(self) -> str:
return self._issuer
@property
def audience(self) -> str:
return self._audience
@property
def token_expire_time(self) -> int:
return self._token_expire_time
@property
def refresh_token_expire_time(self) -> int:
return self._refresh_token_expire_time
def from_dict(self, settings: dict):
try:
self._secret_key = settings['SecretKey']
self._issuer = settings['Issuer']
self._audience = settings['Audience']
self._token_expire_time = int(settings['TokenExpireTime'])
self._refresh_token_expire_time = int(settings['RefreshTokenExpireTime'])
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1,23 @@
import traceback
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.console import Console
class FrontendSettings(ConfigurationModelABC):
def __init__(self):
ConfigurationModelABC.__init__(self)
self._url = ''
@property
def url(self) -> str:
return self._url
def from_dict(self, settings: dict):
try:
self._url = settings['URL']
except Exception as e:
Console.error(f'[ ERROR ] [ {__name__} ]: Reading error in {type(self).__name__} settings')
Console.error(f'[ EXCEPTION ] [ {__name__} ]: {e} -> {traceback.format_exc()}')

View File

@@ -0,0 +1,55 @@
from typing import Optional
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_cli.configuration.version_settings_name_enum import VersionSettingsNameEnum
class VersionSettings(ConfigurationModelABC):
def __init__(
self,
major: str = None,
minor: str = None,
micro: str = None
):
ConfigurationModelABC.__init__(self)
self._major: Optional[str] = major
self._minor: Optional[str] = minor
self._micro: Optional[str] = micro
@property
def major(self) -> str:
return self._major
@property
def minor(self) -> str:
return self._minor
@property
def micro(self) -> str:
return self._micro
def to_str(self) -> str:
if self._micro is None:
return f'{self._major}.{self._minor}'
else:
return f'{self._major}.{self._minor}.{self._micro}'
def from_dict(self, settings: dict):
self._major = settings[VersionSettingsNameEnum.major.value]
self._minor = settings[VersionSettingsNameEnum.minor.value]
micro = settings[VersionSettingsNameEnum.micro.value]
if micro != '':
self._micro = micro
def to_dict(self) -> dict:
version = {
VersionSettingsNameEnum.major.value: self._major,
VersionSettingsNameEnum.minor.value: self._minor,
}
if self._micro is not None:
version[VersionSettingsNameEnum.micro.value] = self._micro
return version

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.controller'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,118 @@
from cpl_core.configuration import ConfigurationABC
from cpl_core.environment import ApplicationEnvironmentABC
from cpl_core.mailing import EMailClientABC, EMailClientSettings
from cpl_translation import TranslatePipe
from flask import request, jsonify, Response
from bot_api.abc.auth_service_abc import AuthServiceABC
from bot_api.api import Api
from bot_api.filter.auth_user_select_criteria import AuthUserSelectCriteria
from bot_api.json_processor import JSONProcessor
from bot_api.logging.api_logger import ApiLogger
from bot_api.model.auth_user_dto import AuthUserDTO
from bot_api.model.token_dto import TokenDTO
from bot_api.model.update_auth_user_dto import UpdateAuthUserDTO
from bot_api.route.route import Route
class AuthController:
BasePath = '/api/auth'
def __init__(
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC,
auth_service: AuthServiceABC
):
self._config = config
self._env = env
self._logger = logger
self._t = t
self._api = api
self._mail_settings = mail_settings
self._mailer = mailer
self._auth_service = auth_service
@Route.get(f'{BasePath}/users')
async def get_all_users(self) -> Response:
result = await self._auth_service.get_all_auth_users_async()
return jsonify(result.select(lambda x: x.to_dict()))
@Route.post(f'{BasePath}/users/get/filtered')
async def get_filtered_users(self) -> Response:
dto: AuthUserSelectCriteria = JSONProcessor.process(AuthUserSelectCriteria, request.get_json(force=True, silent=True))
result = await self._auth_service.get_filtered_auth_users_async(dto)
result.result = result.result.select(lambda x: x.to_dict())
return jsonify(result.to_dict())
@Route.get(f'{BasePath}/users/get/<email>')
async def get_user_from_email(self, email: str) -> Response:
result = await self._auth_service.get_auth_user_by_email_async(email)
return jsonify(result.to_dict())
@Route.get(f'{BasePath}/users/find/<email>')
async def find_user_from_email(self, email: str) -> Response:
result = await self._auth_service.find_auth_user_by_email_async(email)
return jsonify(result.to_dict())
@Route.post(f'{BasePath}/register')
async def register(self):
dto: AuthUserDTO = JSONProcessor.process(AuthUserDTO, request.get_json(force=True, silent=True))
await self._auth_service.add_auth_user_async(dto)
return '', 200
@Route.post(f'{BasePath}/login')
async def login(self) -> Response:
dto: AuthUserDTO = JSONProcessor.process(AuthUserDTO, request.get_json(force=True, silent=True))
result = await self._auth_service.login_async(dto)
return jsonify(result.to_dict())
@Route.post(f'{BasePath}/forgot-password/<email>')
async def forgot_password(self, email: str):
await self._auth_service.forgot_password_async(email)
return '', 200
@Route.post(f'{BasePath}/confirm-forgot-password/<id>')
async def confirm_forgot_password(self, id: str):
await self._auth_service.confirm_forgot_password_async(id)
return '', 200
@Route.post(f'{BasePath}/update-user')
async def update_user(self):
dto: UpdateAuthUserDTO = JSONProcessor.process(UpdateAuthUserDTO, request.get_json(force=True, silent=True))
await self._auth_service.update_user_async(dto)
return '', 200
@Route.post(f'{BasePath}/update-user-as-admin')
async def update_user_as_admin(self):
dto: UpdateAuthUserDTO = JSONProcessor.process(UpdateAuthUserDTO, request.get_json(force=True, silent=True))
await self._auth_service.update_user_async(dto)
return '', 200
@Route.post(f'{BasePath}/refresh')
async def refresh(self) -> Response:
dto: TokenDTO = JSONProcessor.process(TokenDTO, request.get_json(force=True, silent=True))
result = await self._auth_service.refresh_async(dto)
return jsonify(result.to_dict())
@Route.post(f'{BasePath}/revoke')
async def revoke(self):
dto: TokenDTO = JSONProcessor.process(TokenDTO, request.get_json(force=True, silent=True))
await self._auth_service.revoke_async(dto)
return '', 200
@Route.post(f'{BasePath}/delete-user')
async def delete_user(self):
dto: AuthUserDTO = JSONProcessor.process(AuthUserDTO, request.get_json(force=True, silent=True))
await self._auth_service.delete_auth_user_async(dto)
return '', 200
@Route.post(f'{BasePath}/delete-user-by-mail/<email>')
async def delete_user_by_mail(self, email: str):
await self._auth_service.delete_auth_user_by_email_async(email)
return '', 200

View File

@@ -0,0 +1,73 @@
import os
from cpl_core.configuration import ConfigurationABC
from cpl_core.environment import ApplicationEnvironmentABC
from cpl_core.mailing import EMail, EMailClientABC, EMailClientSettings
from cpl_translation import TranslatePipe
from bot_api.api import Api
from bot_api.logging.api_logger import ApiLogger
from bot_api.model.settings_dto import SettingsDTO
from bot_api.model.version_dto import VersionDTO
from bot_api.route.route import Route
class GuiController:
BasePath = f'/api/gui'
def __init__(
self,
config: ConfigurationABC,
env: ApplicationEnvironmentABC,
logger: ApiLogger,
t: TranslatePipe,
api: Api,
mail_settings: EMailClientSettings,
mailer: EMailClientABC
):
self._config = config
self._env = env
self._logger = logger
self._t = t
self._api = api
self._mail_settings = mail_settings
self._mailer = mailer
@Route.get(f'{BasePath}/api-version')
async def api_version(self):
import bot_api
version = bot_api.version_info
return VersionDTO(version.major, version.minor, version.micro).to_dict()
@Route.get(f'{BasePath}/settings')
async def settings(self):
# TODO: Authentication
import bot_api
version = bot_api.version_info
return SettingsDTO(
'',
VersionDTO(version.major, version.minor, version.micro),
os.path.abspath(os.path.join(self._env.working_directory, 'config')),
'',
'/',
0,
0,
self._mail_settings.user_name,
self._mail_settings.port,
self._mail_settings.host,
self._mail_settings.user_name,
self._mail_settings.user_name,
).to_dict()
@Route.get(f'{BasePath}/send-test-mail/<email>')
async def send_test_mail(self, email: str):
# TODO: Authentication
mail = EMail()
mail.add_header('Mime-Version: 1.0')
mail.add_header('Content-Type: text/plain; charset=utf-8')
mail.add_header('Content-Transfer-Encoding: quoted-printable')
mail.add_receiver(email)
mail.subject = self._t.transform('api.api.test_mail.subject')
mail.body = self._t.transform('api.api.test_mail.message').format(self._env.host_name, self._env.environment_name)
self._mailer.send_mail(mail)

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.exception'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,19 @@
from enum import Enum
class ServiceErrorCode(Enum):
Unknown = 0
InvalidDependencies = 1
InvalidData = 2
NotFound = 3
DataAlreadyExists = 4
UnableToAdd = 5
UnableToDelete = 6
InvalidUser = 7
ConnectionFailed = 8
Timeout = 9
MailError = 10

View File

@@ -0,0 +1,13 @@
from bot_api.exception.service_error_code_enum import ServiceErrorCode
class ServiceException(Exception):
def __init__(self, error_code: ServiceErrorCode, message: str, *args):
Exception.__init__(self, *args)
self.error_code = error_code
self.message = message
def get_detailed_message(self) -> str:
return f'ServiceException - ErrorCode: {self.error_code} - ErrorMessage: {self.message}'

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.filter'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,23 @@
from bot_api.abc.select_criteria_abc import SelectCriteriaABC
class AuthUserSelectCriteria(SelectCriteriaABC):
def __init__(
self,
page_index: int,
page_size: int,
sort_direction: str,
sort_column: str,
first_name: str,
last_name: str,
email: str,
auth_role: int
):
SelectCriteriaABC.__init__(self, page_index, page_size, sort_direction, sort_column)
self.first_name = first_name
self.last_name = last_name
self.email = email
self.auth_role = auth_role

View File

@@ -0,0 +1,39 @@
from inspect import signature, Parameter
from cpl_core.utils import String
class JSONProcessor:
@staticmethod
def process(_t: type, values: dict) -> object:
args = []
sig = signature(_t.__init__)
for param in sig.parameters.items():
parameter = param[1]
if parameter.name == 'self' or parameter.annotation == Parameter.empty:
continue
name = String.convert_to_camel_case(parameter.name)
name = name.replace('Dto', 'DTO')
name_first_lower = String.first_to_lower(name)
if name in values or name_first_lower in values:
value = ''
if name in values:
value = values[name]
else:
value = values[name_first_lower]
if isinstance(value, dict):
value = JSONProcessor.process(parameter.annotation, value)
args.append(value)
elif parameter.default != Parameter.empty:
args.append(parameter.default)
else:
args.append(None)
return _t(*args)

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.logging'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,11 @@
from cpl_core.configuration import ConfigurationABC
from cpl_core.environment import ApplicationEnvironmentABC
from cpl_core.time import TimeFormatSettings
from bot_core.abc.custom_file_logger_abc import CustomFileLoggerABC
class ApiLogger(CustomFileLoggerABC):
def __init__(self, config: ConfigurationABC, time_format: TimeFormatSettings, env: ApplicationEnvironmentABC):
CustomFileLoggerABC.__init__(self, 'Api', config, time_format, env)

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.model'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,99 @@
from typing import Optional
from bot_api.abc.dto_abc import DtoABC
from bot_data.model.auth_role_enum import AuthRoleEnum
class AuthUserDTO(DtoABC):
def __init__(
self,
id: int,
first_name: str,
last_name: str,
email: str,
password: str,
confirmation_id: Optional[str],
auth_role: AuthRoleEnum,
):
DtoABC.__init__(self)
self._id = id
self._first_name = first_name
self._last_name = last_name
self._email = email
self._password = password
self._is_confirmed = confirmation_id is None
self._auth_role = auth_role
@property
def id(self) -> int:
return self._id
@property
def first_name(self) -> str:
return self._first_name
@first_name.setter
def first_name(self, value: str):
self._first_name = value
@property
def last_name(self) -> str:
return self._last_name
@last_name.setter
def last_name(self, value: str):
self._last_name = value
@property
def email(self) -> str:
return self._email
@email.setter
def email(self, value: str):
self._email = value
@property
def password(self) -> str:
return self._password
@password.setter
def password(self, value: str):
self._password = value
@property
def is_confirmed(self) -> Optional[str]:
return self._is_confirmed
@is_confirmed.setter
def is_confirmed(self, value: Optional[str]):
self._is_confirmed = value
@property
def auth_role(self) -> AuthRoleEnum:
return self._auth_role
@auth_role.setter
def auth_role(self, value: AuthRoleEnum):
self._auth_role = value
def from_dict(self, values: dict):
self._id = values['id']
self._first_name = values['firstName']
self._last_name = values['lastName']
self._email = values['email']
self._password = values['password']
self._is_confirmed = values['isConfirmed']
self._auth_role = values['authRole']
def to_dict(self) -> dict:
return {
'id': self._id,
'firstName': self._first_name,
'lastName': self._last_name,
'email': self._email,
'password': self._password,
'isConfirmed': self._is_confirmed,
'authRole': self._auth_role.value,
}

View File

@@ -0,0 +1,21 @@
from cpl_query.extension import List
from bot_api.abc.dto_abc import DtoABC
from bot_data.filtered_result import FilteredResult
class AuthUserFilteredResultDTO(DtoABC, FilteredResult):
def __init__(self, result: List = None, total_count: int = 0):
DtoABC.__init__(self)
FilteredResult.__init__(self, result, total_count)
def from_dict(self, values: dict):
self._result = values['users']
self._total_count = values['totalCount']
def to_dict(self) -> dict:
return {
'users': self.result,
'totalCount': self.total_count
}

View File

@@ -0,0 +1,21 @@
import traceback
from cpl_core.console import Console
from bot_api.abc.dto_abc import DtoABC
class EMailStringDTO(DtoABC):
def __init__(self, email: str):
DtoABC.__init__(self)
self._email = email
def from_dict(self, values: dict):
self._email = values['email']
def to_dict(self) -> dict:
return {
'email': self._email
}

View File

@@ -0,0 +1,34 @@
import traceback
from typing import Optional
from cpl_core.console import Console
from bot_api.abc.dto_abc import DtoABC
from bot_api.exception.service_error_code_enum import ServiceErrorCode
class ErrorDTO(DtoABC):
def __init__(self, error_code: Optional[ServiceErrorCode], message: str):
DtoABC.__init__(self)
self._error_code = ServiceErrorCode.Unknown if error_code is None else error_code
self._message = message
@property
def error_code(self) -> ServiceErrorCode:
return self._error_code
@property
def message(self) -> str:
return self._message
def from_dict(self, values: dict):
self._error_code = values['ErrorCode']
self._message = values['Message']
def to_dict(self) -> dict:
return {
'errorCode': int(self._error_code.value),
'message': self._message
}

View File

@@ -0,0 +1,32 @@
import traceback
from cpl_core.console import Console
from bot_api.abc.dto_abc import DtoABC
class ResetPasswordDTO(DtoABC):
def __init__(self, id: str, password: str):
DtoABC.__init__(self)
self._id = id
self._password = password
@property
def id(self) -> str:
return self._id
@property
def password(self) -> str:
return self._password
def from_dict(self, values: dict):
self._id = values['id']
self._password = values['password']
def to_dict(self) -> dict:
return {
'id': self._id,
'password': self._password
}

View File

@@ -0,0 +1,67 @@
from bot_api.abc.dto_abc import DtoABC
from bot_api.model.version_dto import VersionDTO
class SettingsDTO(DtoABC):
def __init__(
self,
web_version: str,
api_version: VersionDTO,
config_path: str,
web_base_url: str,
api_base_url: str,
token_expire_time: int,
refresh_token_expire_time: int,
mail_user: str,
mail_port: int,
mail_host: str,
mail_transceiver: str,
mail_transceiver_address: str,
):
DtoABC.__init__(self)
self._web_version = web_version
self._api_version = api_version
self._config_path = config_path
self._web_base_url = web_base_url
self._api_base_url = api_base_url
self._token_expire_time = token_expire_time
self._refresh_token_expire_time = refresh_token_expire_time
self._mail_user = mail_user
self._mail_port = mail_port
self._mail_host = mail_host
self._mail_transceiver = mail_transceiver
self._mail_transceiver_address = mail_transceiver_address
def from_dict(self, values: dict):
self._web_version = values['webVersion']
self._api_version.from_dict(values['apiVersion'])
self._config_path = values['configPath']
self._web_base_url = values['webBaseURL']
self._api_base_url = values['apiBaseURL']
self._token_expire_time = values['tokenExpireTime']
self._refresh_token_expire_time = values['refreshTokenExpireTime']
self._mail_user = values['mailUser']
self._mail_port = values['mailPort']
self._mail_host = values['mailHost']
self._mail_transceiver = values['mailTransceiver']
self._mail_transceiver_address = values['mailTransceiverAddress']
def to_dict(self) -> dict:
return {
'webVersion': self._web_version,
'apiVersion': self._api_version.to_dict(),
'configPath': self._config_path,
'webBaseURL': self._web_base_url,
'apiBaseURL': self._api_base_url,
'tokenExpireTime': self._token_expire_time,
'refreshTokenExpireTime': self._refresh_token_expire_time,
'mailUser': self._mail_user,
'mailPort': self._mail_port,
'mailHost': self._mail_host,
'mailTransceiver': self._mail_transceiver,
'mailTransceiverAddress': self._mail_transceiver_address,
}

View File

@@ -0,0 +1,32 @@
import traceback
from cpl_core.console import Console
from bot_api.abc.dto_abc import DtoABC
class TokenDTO(DtoABC):
def __init__(self, token: str, refresh_token: str):
DtoABC.__init__(self)
self._token = token
self._refresh_token = refresh_token
@property
def token(self) -> str:
return self._token
@property
def refresh_token(self) -> str:
return self._refresh_token
def from_dict(self, values: dict):
self._token = values['token']
self._refresh_token = values['refreshToken']
def to_dict(self) -> dict:
return {
'token': self._token,
'refreshToken': self._refresh_token
}

View File

@@ -0,0 +1,45 @@
import traceback
from cpl_core.console import Console
from bot_api.abc.dto_abc import DtoABC
from bot_api.model.auth_user_dto import AuthUserDTO
class UpdateAuthUserDTO(DtoABC):
def __init__(
self,
auth_user_dto: AuthUserDTO,
new_auth_user_dto: AuthUserDTO,
change_password=False
):
DtoABC.__init__(self)
self._auth_user = auth_user_dto
self._new_auth_user = new_auth_user_dto
self._change_password = change_password
@property
def auth_user(self) -> AuthUserDTO:
return self._auth_user
@property
def new_auth_user(self) -> AuthUserDTO:
return self._new_auth_user
@property
def change_password(self) -> bool:
return self._change_password
def from_dict(self, values: dict):
self._auth_user = values['authUser']
self._new_auth_user = values['newAuthUser']
self._change_password = False if 'changePassword' not in values else values['changePassword']
def to_dict(self) -> dict:
return {
'authUser': self._auth_user,
'newAuthUser': self._new_auth_user,
'changePassword': self._change_password
}

View File

@@ -0,0 +1,27 @@
import traceback
from cpl_core.console import Console
from bot_api.abc.dto_abc import DtoABC
class VersionDTO(DtoABC):
def __init__(self, major: str = None, minor: str = None, micro: str = None):
DtoABC.__init__(self)
self._major = major
self._minor = minor
self._micro = micro
def from_dict(self, values: dict):
self._major = values['major']
self._minor = values['minor']
self._micro = values['micro']
def to_dict(self) -> dict:
return {
'major': self._major,
'minor': self._minor,
'micro': self._micro,
}

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.route'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,35 @@
from flask_cors import cross_origin
class Route:
registered_routes = {}
@classmethod
def route(cls, path=None, **kwargs):
# simple decorator for class based views
def inner(fn):
cross_origin(fn)
cls.registered_routes[path] = (fn, kwargs)
return fn
return inner
@classmethod
def get(cls, path=None, **kwargs):
return cls.route(path, methods=['GET'], **kwargs)
@classmethod
def post(cls, path=None, **kwargs):
return cls.route(path, methods=['POST'], **kwargs)
@classmethod
def head(cls, path=None, **kwargs):
return cls.route(path, methods=['HEAD'], **kwargs)
@classmethod
def put(cls, path=None, **kwargs):
return cls.route(path, methods=['PUT'], **kwargs)
@classmethod
def delete(cls, path=None, **kwargs):
return cls.route(path, methods=['DELETE'], **kwargs)

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.service'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,385 @@
import hashlib
import re
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
import jwt
from cpl_core.database.context import DatabaseContextABC
from cpl_core.mailing import EMailClientABC, EMail
from cpl_query.extension import List
from cpl_translation import TranslatePipe
from bot_api.abc.auth_service_abc import AuthServiceABC
from bot_api.configuration.authentication_settings import AuthenticationSettings
from bot_api.configuration.frontend_settings import FrontendSettings
from bot_api.exception.service_error_code_enum import ServiceErrorCode
from bot_api.exception.service_exception import ServiceException
from bot_api.filter.auth_user_select_criteria import AuthUserSelectCriteria
from bot_api.logging.api_logger import ApiLogger
from bot_api.model.auth_user_dto import AuthUserDTO
from bot_api.model.auth_user_filtered_result_dto import AuthUserFilteredResultDTO
from bot_api.model.email_string_dto import EMailStringDTO
from bot_api.model.reset_password_dto import ResetPasswordDTO
from bot_api.model.token_dto import TokenDTO
from bot_api.model.update_auth_user_dto import UpdateAuthUserDTO
from bot_api.transformer.auth_user_transformer import AuthUserTransformer as AUT
from bot_data.abc.auth_user_repository_abc import AuthUserRepositoryABC
from bot_data.model.auth_user import AuthUser
class AuthService(AuthServiceABC):
def __init__(
self,
logger: ApiLogger,
auth_users: AuthUserRepositoryABC,
db: DatabaseContextABC,
mailer: EMailClientABC,
t: TranslatePipe,
auth_settings: AuthenticationSettings,
frontend_settings: FrontendSettings,
):
AuthServiceABC.__init__(self)
self._logger = logger
self._auth_users = auth_users
self._db = db
self._mailer = mailer
self._t = t
self._auth_settings = auth_settings
self._frontend_settings = frontend_settings
@staticmethod
def _get_mail_to_send() -> EMail:
mail = EMail()
mail.add_header('Mime-Version: 1.0')
mail.add_header('Content-Type: text/plain charset=utf-8')
mail.add_header('Content-Transfer-Encoding: quoted-printable')
return mail
@staticmethod
def _hash_sha256(password: str) -> str:
return hashlib.sha256(password.encode('utf-8')).hexdigest()
@staticmethod
def _is_email_valid(email: str) -> bool:
if re.match(re.compile(r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'), email) is not None:
return True
return False
def _generate_token(self, user: AuthUser) -> str:
token = jwt.encode(
payload={
'user_id': user.id,
'email': user.email,
'role': user.auth_role.value,
'exp': datetime.now(tz=timezone.utc) + timedelta(days=self._auth_settings.token_expire_time),
'iss': self._auth_settings.issuer,
'aud': self._auth_settings.audience
},
key=self._auth_settings.secret_key
)
return token
def _decode_token(self, token: str) -> dict:
return jwt.decode(
token,
key=self._auth_settings.secret_key,
issuer=self._auth_settings.issuer,
audience=self._auth_settings.audience,
algorithms=['HS256']
)
def _create_and_save_refresh_token(self, user: AuthUser) -> str:
token = str(uuid.uuid4())
user.refresh_token = token
user.refresh_token_expire_time = datetime.now(tz=timezone.utc) + timedelta(days=self._auth_settings.refresh_token_expire_time)
self._auth_users.update_auth_user(user)
self._db.save_changes()
return token
def _send_confirmation_id_to_user(self, user: AuthUser):
url = self._frontend_settings.url
if not url.endswith('/'):
url = f'{url}/'
mail = self._get_mail_to_send()
mail.add_receiver(user.email)
mail.subject = self._t.transform('api.auth.confirmation.subject').format(user.first_name, user.last_name)
mail.body = self._t.transform('api.auth.confirmation.message').format(url, user.confirmation_id)
self._mailer.send_mail(mail)
def _send_forgot_password_id_to_user(self, user: AuthUser):
url = self._frontend_settings.url
if not url.endswith('/'):
url = f'{url}/'
mail = self._get_mail_to_send()
mail.add_receiver(user.email)
mail.subject = self._t.transform('api.auth.forgot_password.subject').format(user.first_name, user.last_name)
mail.body = self._t.transform('api.auth.forgot_password.message').format(url, user.forgot_password_id)
self._mailer.send_mail(mail)
async def get_all_auth_users_async(self) -> List[AuthUserDTO]:
result = self._auth_users.get_all_auth_users() \
.select(lambda x: AUT.to_dto(x))
return List(AuthUserDTO, result)
async def get_filtered_auth_users_async(self, criteria: AuthUserSelectCriteria) -> AuthUserFilteredResultDTO:
users = self._auth_users.get_filtered_auth_users(criteria)
result = users.result.select(lambda x: AUT.to_dto(x))
return AuthUserFilteredResultDTO(
List(AuthUserDTO, result),
users.total_count
)
async def get_auth_user_by_email_async(self, email: str) -> AuthUserDTO:
try:
return AUT.to_dto(self._auth_users.get_auth_user_by_email(email))
except Exception as e:
self._logger.error(__name__, f'AuthUser not found', e)
raise ServiceException(ServiceErrorCode.InvalidData, f'User not found {email}')
async def find_auth_user_by_email_async(self, email: str) -> Optional[AuthUser]:
user = self._auth_users.find_auth_user_by_email(email)
return AUT.to_dto(user) if user is not None else None
async def add_auth_user_async(self, user_dto: AuthUser):
db_user = self._auth_users.find_auth_user_by_email(user_dto.email)
if db_user is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User already exists')
user_dto.password = self._hash_sha256(user_dto.password)
user = AUT.to_db(user_dto)
if not self._is_email_valid(user.email):
raise ServiceException(ServiceErrorCode.InvalidData, 'Invalid E-Mail address')
try:
user.confirmation_id = uuid.uuid4()
self._auth_users.add_auth_user(user)
self._send_confirmation_id_to_user(user)
self._db.save_changes()
self._logger.info(__name__, f'Added auth user with E-Mail: {user_dto.email}')
except Exception as e:
self._logger.error(__name__, f'Cannot add user with E-Mal {user_dto.email}', e)
raise ServiceException(ServiceErrorCode.UnableToAdd, "Invalid E-Mail")
async def update_user_async(self, update_user_dto: UpdateAuthUserDTO):
if update_user_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'User is empty')
if update_user_dto.auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'Existing user is empty')
if update_user_dto.new_auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'New user is empty')
if not self._is_email_valid(update_user_dto.auth_user.email) or not self._is_email_valid(update_user_dto.new_auth_user.email):
raise ServiceException(ServiceErrorCode.InvalidData, f'Invalid E-Mail')
user = self._auth_users.find_auth_user_by_email(update_user_dto.auth_user.email)
if user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User not found')
if user.confirmation_id is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'E-Mail not confirmed')
# update first name
if update_user_dto.new_auth_user.first_name is not None and update_user_dto.auth_user.first_name != update_user_dto.new_auth_user.first_name:
user.first_name = update_user_dto.new_auth_user.first_name
# update last name
if update_user_dto.new_auth_user.last_name is not None and update_user_dto.new_auth_user.last_name != '' and \
update_user_dto.auth_user.last_name != update_user_dto.new_auth_user.last_name:
user.last_name = update_user_dto.new_auth_user.last_name
# update E-Mail
if update_user_dto.new_auth_user.email is not None and update_user_dto.new_auth_user.email != '' and update_user_dto.auth_user.email != update_user_dto.new_auth_user.email:
user_by_new_e_mail = self._auth_users.find_auth_user_by_email(update_user_dto.new_auth_user.email)
if user_by_new_e_mail is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User already exists')
user.email = update_user_dto.new_auth_user.email
is_existing_password_set = False
is_new_password_set = False
# hash passwords in DTOs
if update_user_dto.auth_user.password is not None and update_user_dto.auth_user.password != '':
is_existing_password_set = True
update_user_dto.auth_user.password = self._hash_sha256(update_user_dto.auth_user.password)
if update_user_dto.auth_user.password != user.password:
raise ServiceException(ServiceErrorCode.InvalidUser, 'Wrong password')
if update_user_dto.new_auth_user.password is not None and update_user_dto.new_auth_user.password != '':
is_new_password_set = True
update_user_dto.new_auth_user.password = self._hash_sha256(update_user_dto.new_auth_user.password)
# update password
if is_existing_password_set and is_new_password_set and update_user_dto.auth_user.password != update_user_dto.new_auth_user.password:
user.password = update_user_dto.new_auth_user.password
self._auth_users.update_auth_user(user)
self._db.save_changes()
async def update_user_as_admin_async(self, update_user_dto: UpdateAuthUserDTO):
if update_user_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'User is empty')
if update_user_dto.auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'Existing user is empty')
if update_user_dto.new_auth_user is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'New user is empty')
if not self._is_email_valid(update_user_dto.auth_user.email) or not self._is_email_valid(update_user_dto.new_auth_user.email):
raise ServiceException(ServiceErrorCode.InvalidData, f'Invalid E-Mail')
user = self._auth_users.find_auth_user_by_email(update_user_dto.auth_user.email)
if user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User not found')
if user.confirmation_id is not None and update_user_dto.new_auth_user.is_confirmed:
user.confirmation_id = None
elif user.confirmation_id is None and not update_user_dto.new_auth_user.is_confirmed:
user.confirmation_id = uuid.uuid4()
# else
# raise ServiceException(ServiceErrorCode.InvalidUser, 'E-Mail not confirmed')
# update first name
if update_user_dto.new_auth_user.first_name is not None and update_user_dto.auth_user.first_name != update_user_dto.new_auth_user.first_name:
user.first_name = update_user_dto.new_auth_user.first_name
# update last name
if update_user_dto.new_auth_user.last_name is not None and update_user_dto.new_auth_user.last_name != '' and update_user_dto.auth_user.last_name != update_user_dto.new_auth_user.last_name:
user.last_name = update_user_dto.new_auth_user.last_name
# update E-Mail
if update_user_dto.new_auth_user.email is not None and update_user_dto.new_auth_user.email != '' and update_user_dto.auth_user.email != update_user_dto.new_auth_user.email:
user_by_new_e_mail = self._auth_users.find_auth_user_by_email(update_user_dto.new_auth_user.email)
if user_by_new_e_mail is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, 'User already exists')
user.email = update_user_dto.new_auth_user.email
# update password
if update_user_dto.change_password and update_user_dto.auth_user.password != update_user_dto.new_auth_user.password:
user.password = self._hash_sha256(update_user_dto.new_auth_user.password)
# update role
if user.auth_role == update_user_dto.auth_user.auth_role and user.auth_role != update_user_dto.new_auth_user.auth_role:
user.auth_role = update_user_dto.new_auth_user.auth_role
self._db.save_changes()
async def delete_auth_user_by_email_async(self, email: str):
try:
user = self._auth_users.get_auth_user_by_email(email)
self._auth_users.delete_auth_user(user)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot delete user', e)
raise ServiceException(ServiceErrorCode.UnableToDelete, f'Cannot delete user by mail {email}')
async def delete_auth_user_async(self, user_dto: AuthUser):
try:
self._auth_users.delete_auth_user(AUT.to_db(user_dto))
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Cannot delete user', e)
raise ServiceException(ServiceErrorCode.UnableToDelete, f'Cannot delete user by mail {user_dto.email}')
async def login_async(self, user_dto: AuthUser) -> TokenDTO:
if user_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, 'User not set')
db_user = self._auth_users.find_auth_user_by_email(user_dto.email)
if db_user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, f'User not found')
user_dto.password = self._hash_sha256(user_dto.password)
if db_user.password != user_dto.password:
raise ServiceException(ServiceErrorCode.InvalidUser, 'Wrong password')
token = self._generate_token(db_user)
refresh_token = self._create_and_save_refresh_token(db_user)
if db_user.forgot_password_id is not None:
db_user.forgot_password_id = None
self._db.save_changes()
return TokenDTO(token, refresh_token)
async def refresh_async(self, token_dto: TokenDTO) -> TokenDTO:
if token_dto is None:
raise ServiceException(ServiceErrorCode.InvalidData, f'Token not set')
token = self._decode_token(token_dto.token)
if token is None or 'email' not in token:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token invalid')
try:
user = self._auth_users.get_auth_user_by_email(token['email'])
if user is None or user.refresh_token != token_dto.refresh_token or user.refresh_token_expire_time <= datetime.now():
raise ServiceException(ServiceErrorCode.InvalidData, 'Token expired')
return TokenDTO(self._generate_token(user), self._create_and_save_refresh_token(user))
except Exception as e:
self._logger.error(__name__, f'Refreshing token failed', e)
return TokenDTO('', '')
async def revoke_async(self, token_dto: TokenDTO):
if token_dto is None or token_dto.token is None or token_dto.refresh_token is None:
raise ServiceException(ServiceErrorCode.InvalidData, 'Token not set')
token = self._decode_token(token_dto.token)
try:
user = self._auth_users.get_auth_user_by_email(token['email'])
if user is None or user.refresh_token != token_dto.refresh_token or user.refresh_token_expire_time <= datetime.now():
raise ServiceException(ServiceErrorCode.InvalidData, 'Token expired')
user.refresh_token = None
self._auth_users.update_auth_user(user)
self._db.save_changes()
except Exception as e:
self._logger.error(__name__, f'Refreshing token failed', e)
async def confirm_email_async(self, id: str) -> bool:
user = self._auth_users.find_auth_user_by_confirmation_id(id)
if user is None:
return False
user.confirmation_id = None
self._auth_users.update_auth_user(user)
self._db.save_changes()
return True
async def forgot_password_async(self, email: str):
user = self._auth_users.find_auth_user_by_email(email)
if user is None:
return
user.forgot_password_id = uuid.uuid4()
self._auth_users.update_auth_user(user)
self._send_forgot_password_id_to_user(user)
self._db.save_changes()
async def confirm_forgot_password_async(self, id: str) -> EMailStringDTO:
user = self._auth_users.find_auth_user_by_forgot_password_id(id)
return EMailStringDTO(user.email)
async def reset_password_async(self, rp_dto: ResetPasswordDTO):
user = self._auth_users.find_auth_user_by_forgot_password_id(rp_dto.id)
if user is None:
raise ServiceException(ServiceErrorCode.InvalidUser, f'User by forgot password id {rp_dto.id} not found')
if user.confirmation_id is not None:
raise ServiceException(ServiceErrorCode.InvalidUser, f'E-Mail not confirmed')
if user.password is None or rp_dto.password == '':
raise ServiceException(ServiceErrorCode.InvalidData, f'Password not set')
user.password = self._hash_sha256(rp_dto.password)
self._db.save_changes()

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
bot Keksdose bot
~~~~~~~~~~~~~~~~~~~
Discord bot for the Keksdose discord Server
:copyright: (c) 2022 sh-edraft.de
:license: MIT, see LICENSE for more details.
"""
__title__ = 'bot_api.transformer'
__author__ = 'Sven Heidemann'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022 sh-edraft.de'
__version__ = '0.2.3'
from collections import namedtuple
# imports:
VersionInfo = namedtuple('VersionInfo', 'major minor micro')
version_info = VersionInfo(major='0', minor='2', micro='3')

View File

@@ -0,0 +1,36 @@
from datetime import datetime, timezone
from bot_api.abc.auth_user_transformer_abc import AuthUserTransformerABC
from bot_api.model.auth_user_dto import AuthUserDTO
from bot_data.model.auth_role_enum import AuthRoleEnum
from bot_data.model.auth_user import AuthUser
class AuthUserTransformer(AuthUserTransformerABC):
@staticmethod
def to_db(dto: AuthUser) -> AuthUser:
return AuthUser(
dto.first_name,
dto.last_name,
dto.email,
dto.password,
None,
None,
None,
datetime.now(tz=timezone.utc),
AuthRoleEnum.normal if dto.auth_role is None else dto.auth_role,
id=0 if dto.id is None else dto.id
)
@staticmethod
def to_dto(db: AuthUser) -> AuthUserDTO:
return AuthUserDTO(
db.id,
db.first_name,
db.last_name,
db.email,
db.password,
db.confirmation_id,
db.auth_role
)