Compare commits
2 Commits
2025.09.17
...
2025.09.19
| Author | SHA1 | Date | |
|---|---|---|---|
| 2be58f6577 | |||
| 9c6078f4fd |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -113,6 +113,7 @@ venv.bak/
|
||||
|
||||
# Custom Environments
|
||||
cpl-env/
|
||||
.secret
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
|
||||
@@ -3,6 +3,9 @@ from typing import Callable, Self
|
||||
|
||||
from cpl.application.host import Host
|
||||
from cpl.core.console.console import Console
|
||||
from cpl.core.environment.environment import Environment
|
||||
from cpl.core.log.logger_abc import LoggerABC
|
||||
from cpl.core.log.log_level_enum import LogLevel
|
||||
from cpl.dependency.service_provider_abc import ServiceProviderABC
|
||||
|
||||
|
||||
@@ -38,6 +41,13 @@ class ApplicationABC(ABC):
|
||||
setattr(cls, name, func)
|
||||
return cls
|
||||
|
||||
def with_logging(self, level: LogLevel = None):
|
||||
if level is None:
|
||||
level = Environment.get("LOG_LEVEL", LogLevel, LogLevel.info)
|
||||
|
||||
logger = self._services.get_service(LoggerABC)
|
||||
logger.set_level(level)
|
||||
|
||||
def with_permissions(self, *args, **kwargs):
|
||||
__not_implemented__("cpl-auth", self.with_permissions)
|
||||
|
||||
|
||||
@@ -1,25 +1,27 @@
|
||||
import secrets
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from async_property import async_property
|
||||
|
||||
from cpl.auth.permission.permissions import Permissions
|
||||
from cpl.core.environment import Environment
|
||||
from cpl.core.log import Logger
|
||||
from cpl.core.typing import SerialId, Id
|
||||
from cpl.database.abc import DbModelABC
|
||||
from cpl.dependency import ServiceProviderABC
|
||||
from cpl.core.environment.environment import Environment
|
||||
from cpl.core.log.logger import Logger
|
||||
from cpl.core.typing import Id, SerialId
|
||||
from cpl.core.utils.credential_manager import CredentialManager
|
||||
from cpl.database.abc.db_model_abc import DbModelABC
|
||||
from cpl.dependency.service_provider_abc import ServiceProviderABC
|
||||
|
||||
_logger = Logger(__name__)
|
||||
|
||||
|
||||
class ApiKey(DbModelABC):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
id: SerialId,
|
||||
identifier: str,
|
||||
key: str,
|
||||
key: Union[str, bytes],
|
||||
deleted: bool = False,
|
||||
editor_id: Optional[Id] = None,
|
||||
created: Optional[datetime] = None,
|
||||
@@ -37,12 +39,17 @@ class ApiKey(DbModelABC):
|
||||
def key(self) -> str:
|
||||
return self._key
|
||||
|
||||
@property
|
||||
def plain_key(self) -> str:
|
||||
return CredentialManager.decrypt(self.key)
|
||||
|
||||
@async_property
|
||||
async def permissions(self):
|
||||
from cpl.auth.schema._permission.api_key_permission_dao import ApiKeyPermissionDao
|
||||
|
||||
api_key_permission_dao: ApiKeyPermissionDao = ServiceProviderABC.get_global_service(ApiKeyPermissionDao)
|
||||
return [await x.permission for x in await api_key_permission_dao.find_by_api_key_id(self.id)]
|
||||
apiKeyPermissionDao = ServiceProviderABC.get_global_provider().get_service(ApiKeyPermissionDao)
|
||||
|
||||
return [await x.permission for x in await apiKeyPermissionDao.find_by_api_key_id(self.id)]
|
||||
|
||||
async def has_permission(self, permission: Permissions) -> bool:
|
||||
return permission.value in [x.name for x in await self.permissions]
|
||||
@@ -52,7 +59,7 @@ class ApiKey(DbModelABC):
|
||||
|
||||
@staticmethod
|
||||
def new_key() -> str:
|
||||
return f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}"
|
||||
return CredentialManager.encrypt(f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}")
|
||||
|
||||
@classmethod
|
||||
def new(cls, identifier: str) -> "ApiKey":
|
||||
|
||||
@@ -7,7 +7,6 @@ from typing import Any
|
||||
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
|
||||
from cpl.core.console.console import Console
|
||||
from cpl.core.console.foreground_color_enum import ForegroundColorEnum
|
||||
from cpl.core.environment.environment import Environment
|
||||
from cpl.core.typing import D, T
|
||||
from cpl.core.utils.json_processor import JSONProcessor
|
||||
|
||||
@@ -88,6 +87,8 @@ class Configuration:
|
||||
if os.path.isabs(name):
|
||||
file_path = name
|
||||
else:
|
||||
from cpl.core.environment import Environment
|
||||
|
||||
path_root = Environment.get_cwd()
|
||||
if path is not None:
|
||||
path_root = path
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import os
|
||||
from socket import gethostname
|
||||
from typing import Optional, Type
|
||||
from typing import Type
|
||||
|
||||
from cpl.core.environment.environment_enum import EnvironmentEnum
|
||||
from cpl.core.typing import T
|
||||
from cpl.core.typing import T, D
|
||||
from cpl.core.utils.get_value import get_value
|
||||
|
||||
|
||||
@@ -55,14 +55,14 @@ class Environment:
|
||||
os.environ[key] = str(value)
|
||||
|
||||
@staticmethod
|
||||
def get(key: str, cast_type: Type[T], default: Optional[T] = None) -> Optional[T]:
|
||||
def get(key: str, cast_type: Type[T], default: D = None) -> T | D:
|
||||
"""
|
||||
Get an environment variable and cast it to a specified type.
|
||||
:param str key: The name of the environment variable.
|
||||
:param Type[T] cast_type: A callable to cast the variable's value.
|
||||
:param Optional[T] default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
|
||||
:param T default: The default value to return if the variable is not found. Defaults to None.The default value to return if the variable is not found. Defaults to None.
|
||||
:return: The casted value, or None if the variable is not found.
|
||||
:rtype: Optional[T]
|
||||
:rtype: T | D
|
||||
"""
|
||||
|
||||
return get_value(dict(os.environ), key, cast_type, default)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from .logger import Logger
|
||||
from .logger_abc import LoggerABC
|
||||
from .log_level_enum import LogLevelEnum
|
||||
from .log_level_enum import LogLevel
|
||||
from .logging_settings import LogSettings
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class LogLevelEnum(Enum):
|
||||
class LogLevel(Enum):
|
||||
off = "OFF" # Nothing
|
||||
trace = "TRC" # Detailed app information's
|
||||
debug = "DEB" # Detailed app state
|
||||
|
||||
@@ -3,28 +3,31 @@ import traceback
|
||||
from datetime import datetime
|
||||
|
||||
from cpl.core.console import Console
|
||||
from cpl.core.log.log_level_enum import LogLevelEnum
|
||||
from cpl.core.log.log_level_enum import LogLevel
|
||||
from cpl.core.log.logger_abc import LoggerABC
|
||||
from cpl.core.typing import Messages, Source
|
||||
|
||||
|
||||
class Logger(LoggerABC):
|
||||
_level = LogLevelEnum.info
|
||||
_levels = [x for x in LogLevelEnum]
|
||||
_level = LogLevel.info
|
||||
_levels = [x for x in LogLevel]
|
||||
|
||||
# ANSI color codes for different log levels
|
||||
_COLORS = {
|
||||
LogLevelEnum.trace: "\033[37m", # Light Gray
|
||||
LogLevelEnum.debug: "\033[94m", # Blue
|
||||
LogLevelEnum.info: "\033[92m", # Green
|
||||
LogLevelEnum.warning: "\033[93m", # Yellow
|
||||
LogLevelEnum.error: "\033[91m", # Red
|
||||
LogLevelEnum.fatal: "\033[95m", # Magenta
|
||||
LogLevel.trace: "\033[37m", # Light Gray
|
||||
LogLevel.debug: "\033[94m", # Blue
|
||||
LogLevel.info: "\033[92m", # Green
|
||||
LogLevel.warning: "\033[93m", # Yellow
|
||||
LogLevel.error: "\033[91m", # Red
|
||||
LogLevel.fatal: "\033[95m", # Magenta
|
||||
}
|
||||
|
||||
def __init__(self, source: Source, file_prefix: str = None):
|
||||
LoggerABC.__init__(self)
|
||||
assert source is not None and source != "", "Source cannot be None or empty"
|
||||
|
||||
if source == LoggerABC.__name__:
|
||||
source = None
|
||||
|
||||
self._source = source
|
||||
|
||||
if file_prefix is None:
|
||||
@@ -45,7 +48,7 @@ class Logger(LoggerABC):
|
||||
os.makedirs("logs")
|
||||
|
||||
@classmethod
|
||||
def set_level(cls, level: LogLevelEnum):
|
||||
def set_level(cls, level: LogLevel):
|
||||
if level in cls._levels:
|
||||
cls._level = level
|
||||
else:
|
||||
@@ -69,7 +72,7 @@ class Logger(LoggerABC):
|
||||
log_file.write(content + "\n")
|
||||
log_file.close()
|
||||
|
||||
def _log(self, level: LogLevelEnum, *messages: Messages):
|
||||
def _log(self, level: LogLevel, *messages: Messages):
|
||||
try:
|
||||
if self._levels.index(level) < self._levels.index(self._level):
|
||||
return
|
||||
@@ -78,7 +81,7 @@ class Logger(LoggerABC):
|
||||
formatted_message = self._format_message(level.value, timestamp, *messages)
|
||||
|
||||
self._write_log_to_file(formatted_message)
|
||||
Console.write_line(f"{self._COLORS.get(self._level, '\033[0m')}{formatted_message}\033[0m")
|
||||
Console.write_line(f"{self._COLORS.get(level, '\033[0m')}{formatted_message}\033[0m")
|
||||
except Exception as e:
|
||||
print(f"Error while logging: {e} -> {traceback.format_exc()}")
|
||||
|
||||
@@ -91,27 +94,35 @@ class Logger(LoggerABC):
|
||||
|
||||
messages = [str(message) for message in messages if message is not None]
|
||||
|
||||
return f"<{timestamp}> [{level.upper():^3}] [{self._file_prefix}] - [{self._source}]: {' '.join(messages)}"
|
||||
message = f"<{timestamp}>"
|
||||
message += f" [{level.upper():^3}]"
|
||||
message += f" [{self._file_prefix}]"
|
||||
if self._source is not None:
|
||||
message += f" - [{self._source}]"
|
||||
|
||||
message += f": {' '.join(messages)}"
|
||||
|
||||
return message
|
||||
|
||||
def header(self, string: str):
|
||||
self._log(LogLevelEnum.info, string)
|
||||
self._log(LogLevel.info, string)
|
||||
|
||||
def trace(self, *messages: Messages):
|
||||
self._log(LogLevelEnum.trace, *messages)
|
||||
self._log(LogLevel.trace, *messages)
|
||||
|
||||
def debug(self, *messages: Messages):
|
||||
self._log(LogLevelEnum.debug, *messages)
|
||||
self._log(LogLevel.debug, *messages)
|
||||
|
||||
def info(self, *messages: Messages):
|
||||
self._log(LogLevelEnum.info, *messages)
|
||||
self._log(LogLevel.info, *messages)
|
||||
|
||||
def warning(self, *messages: Messages):
|
||||
self._log(LogLevelEnum.warning, *messages)
|
||||
self._log(LogLevel.warning, *messages)
|
||||
|
||||
def error(self, message, e: Exception = None):
|
||||
self._log(LogLevelEnum.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
|
||||
self._log(LogLevel.error, message, f"{e} -> {traceback.format_exc()}" if e else None)
|
||||
|
||||
def fatal(self, message, e: Exception = None, prevent_quit: bool = False):
|
||||
self._log(LogLevelEnum.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
|
||||
self._log(LogLevel.fatal, message, f"{e} -> {traceback.format_exc()}" if e else None)
|
||||
if not prevent_quit:
|
||||
exit(-1)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from abc import abstractmethod, ABC
|
||||
|
||||
from cpl.core.log.log_level_enum import LogLevel
|
||||
from cpl.core.typing import Messages
|
||||
|
||||
|
||||
@@ -7,7 +8,7 @@ class LoggerABC(ABC):
|
||||
r"""ABC for :class:`cpl.core.log.logger_service.Logger`"""
|
||||
|
||||
@abstractmethod
|
||||
def set_level(self, level: str): ...
|
||||
def set_level(self, level: LogLevel): ...
|
||||
|
||||
@abstractmethod
|
||||
def _format_message(self, level: str, timestamp, *messages: Messages) -> str: ...
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from typing import Optional
|
||||
|
||||
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
|
||||
from cpl.core.log.log_level_enum import LogLevelEnum
|
||||
from cpl.core.log.log_level_enum import LogLevel
|
||||
|
||||
|
||||
class LogSettings(ConfigurationModelABC):
|
||||
@@ -11,14 +11,14 @@ class LogSettings(ConfigurationModelABC):
|
||||
self,
|
||||
path: str = None,
|
||||
filename: str = None,
|
||||
console_log_level: LogLevelEnum = None,
|
||||
file_log_level: LogLevelEnum = None,
|
||||
console_log_level: LogLevel = None,
|
||||
file_log_level: LogLevel = None,
|
||||
):
|
||||
ConfigurationModelABC.__init__(self)
|
||||
self._path: Optional[str] = path
|
||||
self._filename: Optional[str] = filename
|
||||
self._console: Optional[LogLevelEnum] = console_log_level
|
||||
self._level: Optional[LogLevelEnum] = file_log_level
|
||||
self._console: Optional[LogLevel] = console_log_level
|
||||
self._level: Optional[LogLevel] = file_log_level
|
||||
|
||||
@property
|
||||
def path(self) -> str:
|
||||
@@ -37,17 +37,17 @@ class LogSettings(ConfigurationModelABC):
|
||||
self._filename = filename
|
||||
|
||||
@property
|
||||
def console(self) -> LogLevelEnum:
|
||||
def console(self) -> LogLevel:
|
||||
return self._console
|
||||
|
||||
@console.setter
|
||||
def console(self, console: LogLevelEnum) -> None:
|
||||
def console(self, console: LogLevel) -> None:
|
||||
self._console = console
|
||||
|
||||
@property
|
||||
def level(self) -> LogLevelEnum:
|
||||
def level(self) -> LogLevel:
|
||||
return self._level
|
||||
|
||||
@level.setter
|
||||
def level(self, level: LogLevelEnum) -> None:
|
||||
def level(self, level: LogLevel) -> None:
|
||||
self._level = level
|
||||
|
||||
@@ -1,12 +1,40 @@
|
||||
import base64
|
||||
import os
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
from cpl.core.log.logger import Logger
|
||||
|
||||
_logger = Logger(__name__)
|
||||
|
||||
class CredentialManager:
|
||||
r"""Handles credential encryption and decryption"""
|
||||
_secret: str = None
|
||||
|
||||
@staticmethod
|
||||
def encrypt(string: str) -> str:
|
||||
r"""Encode with base64
|
||||
@classmethod
|
||||
def with_secret(cls, file: str = None):
|
||||
if file is None:
|
||||
file = ".secret"
|
||||
|
||||
if not os.path.isfile(file):
|
||||
dirname = os.path.dirname(file)
|
||||
if dirname != "":
|
||||
os.makedirs(dirname, exist_ok=True)
|
||||
|
||||
with open(file, "w") as secret_file:
|
||||
secret_file.write(Fernet.generate_key().decode())
|
||||
secret_file.close()
|
||||
_logger.warning("Secret file not found, regenerating")
|
||||
|
||||
with open(file, "r") as secret_file:
|
||||
secret = secret_file.read().strip()
|
||||
if secret == "" or secret is None:
|
||||
_logger.fatal("No secret found in .secret file.")
|
||||
|
||||
cls._secret = str(secret)
|
||||
|
||||
@classmethod
|
||||
def encrypt(cls, string: str) -> str:
|
||||
r"""Encode with Fernet
|
||||
|
||||
Parameter:
|
||||
string: :class:`str`
|
||||
@@ -15,11 +43,11 @@ class CredentialManager:
|
||||
Returns:
|
||||
Encoded string
|
||||
"""
|
||||
return base64.b64encode(string.encode("utf-8")).decode("utf-8")
|
||||
return Fernet(cls._secret).encrypt(string.encode()).decode()
|
||||
|
||||
@staticmethod
|
||||
def decrypt(string: str) -> str:
|
||||
r"""Decode with base64
|
||||
@classmethod
|
||||
def decrypt(cls, string: str) -> str:
|
||||
r"""Decode with Fernet
|
||||
|
||||
Parameter:
|
||||
string: :class:`str`
|
||||
@@ -28,19 +56,4 @@ class CredentialManager:
|
||||
Returns:
|
||||
Decoded string
|
||||
"""
|
||||
return base64.b64decode(string).decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def build_string(string: str, credentials: str):
|
||||
r"""Builds string with credentials in it
|
||||
|
||||
Parameter:
|
||||
string: :class:`str`
|
||||
String in which the variable is replaced by credentials
|
||||
credentials: :class:`str`
|
||||
String to encode
|
||||
|
||||
Returns:
|
||||
Decoded string
|
||||
"""
|
||||
return string.replace("$credentials", CredentialManager.decrypt(credentials))
|
||||
return Fernet(cls._secret).decrypt(string).decode()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from enum import Enum
|
||||
from typing import Type, Optional
|
||||
|
||||
from cpl.core.typing import T
|
||||
@@ -40,6 +41,19 @@ def get_value(
|
||||
if cast_type == bool:
|
||||
return value.lower() in ["true", "1"]
|
||||
|
||||
if issubclass(cast_type, Enum):
|
||||
try:
|
||||
return cast_type(value)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return cast_type[value]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return default
|
||||
|
||||
if (cast_type if not hasattr(cast_type, "__origin__") else cast_type.__origin__) == list:
|
||||
if not (value.startswith("[") and value.endswith("]")) and list_delimiter not in value:
|
||||
raise ValueError("List values must be enclosed in square brackets or use a delimiter.")
|
||||
|
||||
@@ -11,16 +11,18 @@ def _with_migrations(self: _ApplicationABC, *paths: list[str]) -> _ApplicationAB
|
||||
from cpl.application.host import Host
|
||||
|
||||
from cpl.database.service.migration_service import MigrationService
|
||||
|
||||
migration_service = self._services.get_service(MigrationService)
|
||||
migration_service.with_directory("./scripts")
|
||||
Host.run(migration_service.migrate)
|
||||
|
||||
return self
|
||||
|
||||
|
||||
def _with_seeders(self: _ApplicationABC) -> _ApplicationABC:
|
||||
from cpl.database.service.seeder_service import SeederService
|
||||
from cpl.application.host import Host
|
||||
|
||||
|
||||
seeder_service: SeederService = self._services.get_service(SeederService)
|
||||
Host.run(seeder_service.seed)
|
||||
return self
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import Generic, Optional, Union, Type, List, Any
|
||||
|
||||
from cpl.core.ctx import get_user
|
||||
from cpl.core.typing import T, Id
|
||||
from cpl.core.utils import String
|
||||
from cpl.core.utils.string import String
|
||||
from cpl.core.utils.get_value import get_value
|
||||
from cpl.database.abc.db_context_abc import DBContextABC
|
||||
from cpl.database.const import DATETIME_FORMAT
|
||||
|
||||
@@ -3,7 +3,6 @@ from typing import Optional
|
||||
from cpl.core.configuration import Configuration
|
||||
from cpl.core.configuration.configuration_model_abc import ConfigurationModelABC
|
||||
from cpl.core.environment import Environment
|
||||
from cpl.core.utils import Base64
|
||||
|
||||
|
||||
class DatabaseSettings(ConfigurationModelABC):
|
||||
@@ -27,7 +26,7 @@ class DatabaseSettings(ConfigurationModelABC):
|
||||
self._host: Optional[str] = host
|
||||
self._port: Optional[int] = port
|
||||
self._user: Optional[str] = user
|
||||
self._password: Optional[str] = Base64.decode(password) if Base64.is_b64(password) else password
|
||||
self._password: Optional[str] = password
|
||||
self._database: Optional[str] = database
|
||||
self._charset: Optional[str] = charset
|
||||
self._use_unicode: Optional[bool] = use_unicode
|
||||
|
||||
@@ -5,8 +5,7 @@ from mysql.connector.abstracts import MySQLConnectionAbstract
|
||||
from mysql.connector.cursor import MySQLCursorBuffered
|
||||
|
||||
from cpl.database.abc.connection_abc import ConnectionABC
|
||||
from cpl.database.database_settings import DatabaseSettings
|
||||
from cpl.core.utils.credential_manager import CredentialManager
|
||||
from cpl.database.model.database_settings import DatabaseSettings
|
||||
|
||||
|
||||
class DatabaseConnection(ConnectionABC):
|
||||
@@ -31,7 +30,7 @@ class DatabaseConnection(ConnectionABC):
|
||||
host=settings.host,
|
||||
port=settings.port,
|
||||
user=settings.user,
|
||||
passwd=CredentialManager.decrypt(settings.password),
|
||||
passwd=settings.password,
|
||||
charset=settings.charset,
|
||||
use_unicode=settings.use_unicode,
|
||||
buffered=settings.buffered,
|
||||
@@ -43,7 +42,7 @@ class DatabaseConnection(ConnectionABC):
|
||||
host=settings.host,
|
||||
port=settings.port,
|
||||
user=settings.user,
|
||||
passwd=CredentialManager.decrypt(settings.password),
|
||||
passwd=settings.password,
|
||||
db=settings.database,
|
||||
charset=settings.charset,
|
||||
use_unicode=settings.use_unicode,
|
||||
|
||||
@@ -2,7 +2,6 @@ import ssl
|
||||
from smtplib import SMTP
|
||||
from typing import Optional
|
||||
|
||||
from cpl.core.utils.credential_manager import CredentialManager
|
||||
from cpl.mail.abc.email_client_abc import EMailClientABC
|
||||
from cpl.mail.email_client_settings import EMailClientSettings
|
||||
from cpl.mail.email_model import EMail
|
||||
@@ -62,9 +61,7 @@ class EMailClient(EMailClientABC):
|
||||
__name__,
|
||||
f"Try to login {self._mail_settings.user_name}@{self._mail_settings.host}:{self._mail_settings.port}",
|
||||
)
|
||||
self._server.login(
|
||||
self._mail_settings.user_name, CredentialManager.decrypt(self._mail_settings.credentials)
|
||||
)
|
||||
self._server.login(self._mail_settings.user_name, self._mail_settings.credentials)
|
||||
self._logger.info(
|
||||
__name__,
|
||||
f"Logged on as {self._mail_settings.user_name} to {self._mail_settings.host}:{self._mail_settings.port}",
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from cpl.application.abc.application_abc import ApplicationABC
|
||||
from cpl.application.abc import ApplicationABC
|
||||
from cpl.auth.keycloak import KeycloakAdmin
|
||||
from cpl.core.console import Console
|
||||
from cpl.core.environment import Environment
|
||||
@@ -14,7 +14,7 @@ class Application(ApplicationABC):
|
||||
def __init__(self, services: ServiceProviderABC):
|
||||
ApplicationABC.__init__(self, services)
|
||||
|
||||
self._logger: LoggerABC = services.get_service(LoggerABC)
|
||||
self._logger = services.get_service(LoggerABC)
|
||||
|
||||
async def test_daos(self):
|
||||
userDao: UserDao = self._services.get_service(UserDao)
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
"Host": "localhost",
|
||||
"User": "cpl",
|
||||
"Port": 3306,
|
||||
"Password": "Y3Bs",
|
||||
"Password": "cpl",
|
||||
"Database": "cpl",
|
||||
"Charset": "utf8mb4",
|
||||
"UseUnicode": "true",
|
||||
|
||||
@@ -8,6 +8,7 @@ def main():
|
||||
builder = ApplicationBuilder(Application).with_startup(Startup)
|
||||
app = builder.build()
|
||||
|
||||
app.with_logging()
|
||||
app.with_permissions(CustomPermissions)
|
||||
app.with_migrations("./scripts")
|
||||
app.with_seeders()
|
||||
|
||||
@@ -2,14 +2,18 @@ from application import Application
|
||||
from cpl.application import ApplicationBuilder
|
||||
from cpl.auth.permission.permissions_registry import PermissionsRegistry
|
||||
from cpl.core.console import Console
|
||||
from cpl.core.log import LogLevel
|
||||
from custom_permissions import CustomPermissions
|
||||
from startup import Startup
|
||||
|
||||
|
||||
def main():
|
||||
builder = ApplicationBuilder(Application).with_startup(Startup)
|
||||
builder.services.add_logging()
|
||||
|
||||
app = builder.build()
|
||||
|
||||
app.with_logging(LogLevel.trace)
|
||||
app.with_permissions(CustomPermissions)
|
||||
app.with_migrations("./scripts")
|
||||
app.with_seeders()
|
||||
|
||||
@@ -6,34 +6,30 @@ from cpl.core.utils import CredentialManager
|
||||
class CredentialManagerTestCase(unittest.TestCase):
|
||||
def setUp(self): ...
|
||||
|
||||
def test_encrypt(self):
|
||||
self.assertEqual("ZkVjSkplQUx4aW1zWHlPbA==", CredentialManager.encrypt("fEcJJeALximsXyOl"))
|
||||
self.assertEqual("QmtVd1l4dW5Sck9jRmVTQQ==", CredentialManager.encrypt("BkUwYxunRrOcFeSA"))
|
||||
self.assertEqual("c2FtaHF1VkNSdmZpSGxDcQ==", CredentialManager.encrypt("samhquVCRvfiHlCq"))
|
||||
self.assertEqual("S05aWHBPYW9DbkRSV01rWQ==", CredentialManager.encrypt("KNZXpOaoCnDRWMkY"))
|
||||
self.assertEqual("QmtUV0Zsb3h1Y254UkJWeg==", CredentialManager.encrypt("BkTWFloxucnxRBVz"))
|
||||
self.assertEqual("VFdNTkRuYXB1b1dndXNKdw==", CredentialManager.encrypt("TWMNDnapuoWgusJw"))
|
||||
self.assertEqual("WVRiQXVSZXRMblpicWNrcQ==", CredentialManager.encrypt("YTbAuRetLnZbqckq"))
|
||||
self.assertEqual("bmN4aExackxhYUVVdnV2VA==", CredentialManager.encrypt("ncxhLZrLaaEUvuvT"))
|
||||
self.assertEqual("dmpNT0J5U0lLQmFrc0pIYQ==", CredentialManager.encrypt("vjMOBySIKBaksJHa"))
|
||||
self.assertEqual("ZHd6WHFzSlFvQlhRbGtVZw==", CredentialManager.encrypt("dwzXqsJQoBXQlkUg"))
|
||||
self.assertEqual("Q0lmUUhOREtiUmxnY2VCbQ==", CredentialManager.encrypt("CIfQHNDKbRlgceBm"))
|
||||
def test_encrypt(self): ...
|
||||
|
||||
def test_decrypt(self):
|
||||
self.assertEqual("fEcJJeALximsXyOl", CredentialManager.decrypt("ZkVjSkplQUx4aW1zWHlPbA=="))
|
||||
self.assertEqual("BkUwYxunRrOcFeSA", CredentialManager.decrypt("QmtVd1l4dW5Sck9jRmVTQQ=="))
|
||||
self.assertEqual("samhquVCRvfiHlCq", CredentialManager.decrypt("c2FtaHF1VkNSdmZpSGxDcQ=="))
|
||||
self.assertEqual("KNZXpOaoCnDRWMkY", CredentialManager.decrypt("S05aWHBPYW9DbkRSV01rWQ=="))
|
||||
self.assertEqual("BkTWFloxucnxRBVz", CredentialManager.decrypt("QmtUV0Zsb3h1Y254UkJWeg=="))
|
||||
self.assertEqual("TWMNDnapuoWgusJw", CredentialManager.decrypt("VFdNTkRuYXB1b1dndXNKdw=="))
|
||||
self.assertEqual("YTbAuRetLnZbqckq", CredentialManager.decrypt("WVRiQXVSZXRMblpicWNrcQ=="))
|
||||
self.assertEqual("ncxhLZrLaaEUvuvT", CredentialManager.decrypt("bmN4aExackxhYUVVdnV2VA=="))
|
||||
self.assertEqual("vjMOBySIKBaksJHa", CredentialManager.decrypt("dmpNT0J5U0lLQmFrc0pIYQ=="))
|
||||
self.assertEqual("dwzXqsJQoBXQlkUg", CredentialManager.decrypt("ZHd6WHFzSlFvQlhRbGtVZw=="))
|
||||
self.assertEqual("CIfQHNDKbRlgceBm", CredentialManager.decrypt("Q0lmUUhOREtiUmxnY2VCbQ=="))
|
||||
# self.assertEqual("ZkVjSkplQUx4aW1zWHlPbA==", CredentialManager.encrypt("fEcJJeALximsXyOl"))
|
||||
# self.assertEqual("QmtVd1l4dW5Sck9jRmVTQQ==", CredentialManager.encrypt("BkUwYxunRrOcFeSA"))
|
||||
# self.assertEqual("c2FtaHF1VkNSdmZpSGxDcQ==", CredentialManager.encrypt("samhquVCRvfiHlCq"))
|
||||
# self.assertEqual("S05aWHBPYW9DbkRSV01rWQ==", CredentialManager.encrypt("KNZXpOaoCnDRWMkY"))
|
||||
# self.assertEqual("QmtUV0Zsb3h1Y254UkJWeg==", CredentialManager.encrypt("BkTWFloxucnxRBVz"))
|
||||
# self.assertEqual("VFdNTkRuYXB1b1dndXNKdw==", CredentialManager.encrypt("TWMNDnapuoWgusJw"))
|
||||
# self.assertEqual("WVRiQXVSZXRMblpicWNrcQ==", CredentialManager.encrypt("YTbAuRetLnZbqckq"))
|
||||
# self.assertEqual("bmN4aExackxhYUVVdnV2VA==", CredentialManager.encrypt("ncxhLZrLaaEUvuvT"))
|
||||
# self.assertEqual("dmpNT0J5U0lLQmFrc0pIYQ==", CredentialManager.encrypt("vjMOBySIKBaksJHa"))
|
||||
# self.assertEqual("ZHd6WHFzSlFvQlhRbGtVZw==", CredentialManager.encrypt("dwzXqsJQoBXQlkUg"))
|
||||
# self.assertEqual("Q0lmUUhOREtiUmxnY2VCbQ==", CredentialManager.encrypt("CIfQHNDKbRlgceBm"))
|
||||
|
||||
def test_build_string(self):
|
||||
self.assertEqual(
|
||||
"TestStringWithCredentialsfEcJJeALximsXyOlHere",
|
||||
CredentialManager.build_string("TestStringWithCredentials$credentialsHere", "ZkVjSkplQUx4aW1zWHlPbA=="),
|
||||
)
|
||||
def test_decrypt(self): ...
|
||||
|
||||
# self.assertEqual("fEcJJeALximsXyOl", CredentialManager.decrypt("ZkVjSkplQUx4aW1zWHlPbA=="))
|
||||
# self.assertEqual("BkUwYxunRrOcFeSA", CredentialManager.decrypt("QmtVd1l4dW5Sck9jRmVTQQ=="))
|
||||
# self.assertEqual("samhquVCRvfiHlCq", CredentialManager.decrypt("c2FtaHF1VkNSdmZpSGxDcQ=="))
|
||||
# self.assertEqual("KNZXpOaoCnDRWMkY", CredentialManager.decrypt("S05aWHBPYW9DbkRSV01rWQ=="))
|
||||
# self.assertEqual("BkTWFloxucnxRBVz", CredentialManager.decrypt("QmtUV0Zsb3h1Y254UkJWeg=="))
|
||||
# self.assertEqual("TWMNDnapuoWgusJw", CredentialManager.decrypt("VFdNTkRuYXB1b1dndXNKdw=="))
|
||||
# self.assertEqual("YTbAuRetLnZbqckq", CredentialManager.decrypt("WVRiQXVSZXRMblpicWNrcQ=="))
|
||||
# self.assertEqual("ncxhLZrLaaEUvuvT", CredentialManager.decrypt("bmN4aExackxhYUVVdnV2VA=="))
|
||||
# self.assertEqual("vjMOBySIKBaksJHa", CredentialManager.decrypt("dmpNT0J5U0lLQmFrc0pIYQ=="))
|
||||
# self.assertEqual("dwzXqsJQoBXQlkUg", CredentialManager.decrypt("ZHd6WHFzSlFvQlhRbGtVZw=="))
|
||||
# self.assertEqual("CIfQHNDKbRlgceBm", CredentialManager.decrypt("Q0lmUUhOREtiUmxnY2VCbQ=="))
|
||||
|
||||
Reference in New Issue
Block a user