sh_cpl/src/cpl_core/configuration/configuration.py

388 lines
15 KiB
Python

import json
import os
import sys
import traceback
from collections.abc import Callable
from typing import Union, Type, Optional
from cpl_core.configuration.argument_abc import ArgumentABC
from cpl_core.configuration.argument_builder import ArgumentBuilder
from cpl_core.configuration.argument_executable_abc import ArgumentExecutableABC
from cpl_core.configuration.argument_type_enum import ArgumentTypeEnum
from cpl_core.configuration.configuration_abc import ConfigurationABC
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
from cpl_core.configuration.configuration_variable_name_enum import (
ConfigurationVariableNameEnum,
)
from cpl_core.configuration.executable_argument import ExecutableArgument
from cpl_core.configuration.flag_argument import FlagArgument
from cpl_core.configuration.validator_abc import ValidatorABC
from cpl_core.configuration.variable_argument import VariableArgument
from cpl_core.console.console import Console
from cpl_core.console.foreground_color_enum import ForegroundColorEnum
from cpl_core.dependency_injection.service_provider_abc import ServiceProviderABC
from cpl_core.environment.application_environment import ApplicationEnvironment
from cpl_core.environment.application_environment_abc import ApplicationEnvironmentABC
from cpl_core.environment.environment_name_enum import EnvironmentNameEnum
from cpl_core.type import T
from cpl_core.utils.json_processor import JSONProcessor
class Configuration(ConfigurationABC):
def __init__(self):
r"""Representation of configuration"""
ConfigurationABC.__init__(self)
self._application_environment = ApplicationEnvironment()
self._config: dict[Union[type, str], Union[ConfigurationModelABC, str]] = {}
self._argument_types: list[ArgumentABC] = []
self._additional_arguments: list[str] = []
self._argument_error_function: Optional[Callable] = None
self._handled_args = []
@property
def environment(self) -> ApplicationEnvironmentABC:
return self._application_environment
@property
def additional_arguments(self) -> list[str]:
return self._additional_arguments
@property
def argument_error_function(self) -> Optional[Callable]:
return self._argument_error_function
@argument_error_function.setter
def argument_error_function(self, argument_error_function: Callable):
self._argument_error_function = argument_error_function
@property
def arguments(self) -> list[ArgumentABC]:
return self._argument_types
@staticmethod
def _print_info(name: str, message: str):
r"""Prints an info message
Parameter:
name: :class:`str`
Info name
message: :class:`str`
Info message
"""
Console.set_foreground_color(ForegroundColorEnum.green)
Console.write_line(f"[{name}] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
def _print_warn(name: str, message: str):
r"""Prints a warning
Parameter:
name: :class:`str`
Warning name
message: :class:`str`
Warning message
"""
Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.write_line(f"[{name}] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
@staticmethod
def _print_error(name: str, message: str):
r"""Prints an error
Parameter:
name: :class:`str`
Error name
message: :class:`str`
Error message
"""
Console.set_foreground_color(ForegroundColorEnum.red)
Console.write_line(f"[{name}] {message}")
Console.set_foreground_color(ForegroundColorEnum.default)
def _set_variable(self, name: str, value: any):
r"""Sets variable to given value
Parameter:
name: :class:`str`
Name of the variable
value: :class:`any`
Value of the variable
"""
if name == ConfigurationVariableNameEnum.environment.value:
self._application_environment.environment_name = EnvironmentNameEnum(value)
elif name == ConfigurationVariableNameEnum.name.value:
self._application_environment.application_name = value
elif name == ConfigurationVariableNameEnum.customer.value:
self._application_environment.customer = value
else:
self._config[name] = value
def _load_json_file(self, file: str, output: bool) -> dict:
r"""Reads the json file
Parameter:
file: :class:`str`
Name of the file
output: :class:`bool`
Specifies whether an output should take place
Returns:
Object of :class:`dict`
"""
try:
# open config file, create if not exists
with open(file, encoding="utf-8") as cfg:
# load json
json_cfg = json.load(cfg)
if output:
self._print_info(__name__, f"Loaded config file: {file}")
return json_cfg
except Exception as e:
self._print_error(__name__, f"Cannot load config file: {file}! -> {e}")
return {}
def _handle_pre_or_post_executables(self, pre: bool, argument: ExecutableArgument, services: ServiceProviderABC):
script_type = "pre-" if pre else "post-"
from cpl_cli.configuration.workspace_settings import WorkspaceSettings
workspace: Optional[WorkspaceSettings] = self.get_configuration(WorkspaceSettings)
if workspace is None or len(workspace.scripts) == 0:
return
for script in workspace.scripts:
if script_type not in script and not script.startswith(script_type):
continue
# split in two ifs to prevent exception
if script.split(script_type)[1] != argument.name:
continue
from cpl_cli.command.custom_script_service import CustomScriptService
css: CustomScriptService = services.get_service(CustomScriptService)
if css is None:
continue
Console.write_line()
self._set_variable("ACTIVE_EXECUTABLE", script)
css.run(self._additional_arguments)
def _parse_arguments(
self,
executables: list[ArgumentABC],
arg_list: list[str],
args_types: list[ArgumentABC],
):
for i in range(0, len(arg_list)):
arg_str = arg_list[i]
for n in range(0, len(args_types)):
arg = args_types[n]
arg_str_without_token = arg_str
if arg.token != "" and arg.token in arg_str:
arg_str_without_token = arg_str.split(arg.token)[1]
# executable
if isinstance(arg, ExecutableArgument):
if (
arg_str.startswith(arg.token)
and arg_str_without_token == arg.name
or arg_str_without_token in arg.aliases
):
executables.append(arg)
self._handled_args.append(arg_str)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# variables
elif isinstance(arg, VariableArgument):
arg_str_without_value = arg_str_without_token
if arg.value_token in arg_str_without_value:
arg_str_without_value = arg_str_without_token.split(arg.value_token)[0]
if (
arg_str.startswith(arg.token)
and arg_str_without_value == arg.name
or arg_str_without_value in arg.aliases
):
if arg.value_token != " ":
value = arg_str_without_token.split(arg.value_token)[1]
else:
value = arg_list[i + 1]
self._set_variable(arg.name, value)
self._handled_args.append(arg_str)
self._handled_args.append(value)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# flags
elif isinstance(arg, FlagArgument):
if (
arg_str.startswith(arg.token)
and arg_str_without_token == arg.name
or arg_str_without_token in arg.aliases
):
if arg_str in self._additional_arguments:
self._additional_arguments.remove(arg_str)
self._additional_arguments.append(arg.name)
self._handled_args.append(arg_str)
self._parse_arguments(executables, arg_list[i + 1 :], arg.console_arguments)
# add left over values to args
if arg_str not in self._additional_arguments and arg_str not in self._handled_args:
self._additional_arguments.append(arg_str)
def add_environment_variables(self, prefix: str):
for env_var in os.environ.keys():
if not env_var.startswith(prefix):
continue
self._set_variable(env_var.replace(prefix, ""), os.environ[env_var])
def add_console_argument(self, argument: ArgumentABC):
self._argument_types.append(argument)
def add_json_file(self, name: str, optional: bool = None, output: bool = True, path: str = None):
if os.path.isabs(name):
file_path = name
else:
path_root = self._application_environment.working_directory
if path is not None:
path_root = path
if str(path_root).endswith("/") and not name.startswith("/"):
file_path = f"{path_root}{name}"
else:
file_path = f"{path_root}/{name}"
if not os.path.isfile(file_path):
if optional is not True:
if output:
self._print_error(__name__, f"File not found: {file_path}")
sys.exit()
if output:
self._print_warn(__name__, f"Not Loaded config file: {file_path}")
return None
config_from_file = self._load_json_file(file_path, output)
for sub in ConfigurationModelABC.__subclasses__():
for key, value in config_from_file.items():
if sub.__name__ != key and sub.__name__.replace("Settings", "") != key:
continue
configuration = sub()
from_dict = getattr(configuration, "from_dict", None)
if from_dict is not None and not hasattr(from_dict, "is_base_func"):
Console.set_foreground_color(ForegroundColorEnum.yellow)
Console.write_line(
f"{sub.__name__}.from_dict is deprecated. Instead, set attributes as typed arguments in __init__. They can be None by default!"
)
Console.color_reset()
configuration.from_dict(value)
else:
configuration = JSONProcessor.process(sub, value)
self.add_configuration(sub, configuration)
def add_configuration(self, key_type: T, value: any):
self._config[key_type] = value
def create_console_argument(
self,
arg_type: ArgumentTypeEnum,
token: str,
name: str,
aliases: list[str],
*args,
**kwargs,
) -> ArgumentABC:
argument = ArgumentBuilder.build_argument(arg_type, token, name, aliases, *args, **kwargs)
self._argument_types.append(argument)
return argument
def for_each_argument(self, call: Callable):
for arg in self._argument_types:
call(arg)
def get_configuration(self, search_type: T) -> Optional[T]:
if type(search_type) is str:
if search_type == ConfigurationVariableNameEnum.environment.value:
return self._application_environment.environment_name
elif search_type == ConfigurationVariableNameEnum.name.value:
return self._application_environment.application_name
elif search_type == ConfigurationVariableNameEnum.customer.value:
return self._application_environment.customer
if search_type not in self._config:
return None
for config_model in self._config:
if config_model == search_type:
return self._config[config_model]
def parse_console_arguments(self, services: ServiceProviderABC, error: bool = None) -> bool:
# sets environment variables as possible arguments as: --VAR=VALUE
for arg_name in ConfigurationVariableNameEnum.to_list():
self.add_console_argument(VariableArgument("--", str(arg_name).upper(), [str(arg_name).lower()], "="))
success = False
try:
arg_list = sys.argv[1:]
executables: list[ExecutableArgument] = []
self._parse_arguments(executables, arg_list, self._argument_types)
except Exception as e:
Console.error("An error occurred while parsing arguments.")
sys.exit()
try:
prevent = False
for exe in executables:
if prevent:
continue
if exe.validators is not None:
abort = False
for validator_type in exe.validators:
validator: ValidatorABC = services.get_service(validator_type)
result = validator.validate()
abort = not result
if abort:
break
if abort:
sys.exit()
cmd: ArgumentExecutableABC = services.get_service(exe.executable_type)
self._handle_pre_or_post_executables(True, exe, services)
self._set_variable("ACTIVE_EXECUTABLE", exe.name)
args = self.get_configuration("ARGS")
if args is not None:
for arg in args.split(" "):
if arg == "":
continue
self._additional_arguments.append(arg)
cmd.run(self._additional_arguments)
self._handle_pre_or_post_executables(False, exe, services)
prevent = exe.prevent_next_executable
success = True
except Exception as e:
Console.error("An error occurred while executing arguments.", traceback.format_exc())
sys.exit()
return success