sh_cpl/src/cpl/configuration/configuration.py

249 lines
9.8 KiB
Python
Raw Normal View History

import json
import os
import sys
2021-03-03 18:37:39 +01:00
from collections import Callable
2021-03-04 06:53:38 +01:00
from typing import Union, Type, Optional
2020-11-25 21:41:45 +01:00
2021-03-03 10:47:52 +01:00
from cpl.configuration.configuration_abc import ConfigurationABC
from cpl.configuration.configuration_model_abc import ConfigurationModelABC
from cpl.configuration.configuration_variable_name import ConfigurationVariableName
2021-03-03 18:37:39 +01:00
from cpl.configuration.console_argument import ConsoleArgument
2021-03-03 10:47:52 +01:00
from cpl.console.console import Console
from cpl.console.foreground_color import ForegroundColor
from cpl.environment.hosting_environment import HostingEnvironment
from cpl.environment.environment_abc import EnvironmentABC
from cpl.environment.environment_name import EnvironmentName
2020-11-25 21:41:45 +01:00
2021-03-03 10:47:52 +01:00
class Configuration(ConfigurationABC):
2020-11-25 21:41:45 +01:00
def __init__(self):
2021-03-03 10:47:52 +01:00
ConfigurationABC.__init__(self)
2020-11-25 21:41:45 +01:00
self._hosting_environment = HostingEnvironment()
2021-03-03 18:37:39 +01:00
self._config: dict[Union[type, str], Union[ConfigurationModelABC, str]] = {}
self._argument_types: list[ConsoleArgument] = []
self._additional_arguments: list[str] = []
2020-11-26 11:20:21 +01:00
2021-03-04 06:53:38 +01:00
self._argument_error_function: Optional[Callable] = None
2021-03-09 14:35:45 +01:00
self._is_multiple_args_allowed = False
@property
2021-03-03 10:47:52 +01:00
def environment(self) -> EnvironmentABC:
return self._hosting_environment
2020-11-25 21:41:45 +01:00
2021-03-03 18:37:39 +01:00
@property
def additional_arguments(self) -> list[str]:
return self._additional_arguments
2021-03-04 06:53:38 +01:00
@property
def argument_error_function(self) -> Optional[Callable]:
return self._argument_error_function
@argument_error_function.setter
def argument_error_function(self, argument_error_function: Callable):
self._argument_error_function = argument_error_function
2021-03-09 14:35:45 +01:00
def allow_multiple_args(self):
self._is_multiple_args_allowed = True
@staticmethod
def _print_info(name: str, message: str):
2020-12-14 21:10:26 +01:00
Console.set_foreground_color(ForegroundColor.green)
Console.write_line(f'[{name}] {message}')
Console.set_foreground_color(ForegroundColor.default)
@staticmethod
def _print_warn(name: str, message: str):
2020-12-14 21:10:26 +01:00
Console.set_foreground_color(ForegroundColor.yellow)
Console.write_line(f'[{name}] {message}')
Console.set_foreground_color(ForegroundColor.default)
@staticmethod
def _print_error(name: str, message: str):
2020-12-14 21:10:26 +01:00
Console.set_foreground_color(ForegroundColor.red)
Console.write_line(f'[{name}] {message}')
Console.set_foreground_color(ForegroundColor.default)
def _set_variable(self, name: str, value: str):
if name == ConfigurationVariableName.environment.value:
2020-11-29 17:29:16 +01:00
self._hosting_environment.environment_name = EnvironmentName(value)
elif name == ConfigurationVariableName.name.value:
self._hosting_environment.application_name = value
elif name == ConfigurationVariableName.customer.value:
self._hosting_environment.customer = value
2021-03-03 18:37:39 +01:00
else:
self._config[name] = value
def _validate_argument_child(self, argument: str, argument_type: ConsoleArgument, next_arguments: Optional[list[str]]) -> bool:
if argument_type.console_arguments is not None and len(argument_type.console_arguments) > 0:
found = False
for child_argument_type in argument_type.console_arguments:
found = self._validate_argument_by_argument_type(argument, child_argument_type, next_arguments[1:])
if not found:
raise Exception(f'Invalid argument: {argument}')
return found
return True
def _validate_argument_by_argument_type(self, argument: str, argument_type: ConsoleArgument, next_arguments: list[str] = None) -> bool:
if argument_type.token != '' and argument.startswith(argument_type.token) and (argument_type.name == argument.split(argument_type.token)[1] or argument in argument_type.aliases) and argument_type.value_token == '':
# --new
self._additional_arguments.append(argument_type.name)
if next_arguments is not None and len(next_arguments) > 0:
return self._validate_argument_child(next_arguments[0], argument_type, next_arguments)
return True
elif argument_type.token != '' and argument.startswith(argument_type.token) and (argument_type.name == argument.split(argument_type.token)[1] or argument in argument_type.aliases) and argument_type.value_token != '':
# --new=
if argument_type.value_token == ' ':
if next_arguments is None or len(next_arguments) == 0:
raise Exception(f'Invalid argument: {argument}')
value = next_arguments[0]
else:
value = argument.split(argument_type.value_token)[1]
self._set_variable(argument_type.name, value)
self._additional_arguments.append(argument_type.name)
if next_arguments is not None and len(next_arguments) > 0:
return self._validate_argument_child(next_arguments[0], argument_type, next_arguments)
return True
elif argument_type.token == '' and (argument.startswith(argument_type.name) or argument in argument_type.aliases) and argument_type.value_token == '':
# new
self._additional_arguments.append(argument_type.name)
if next_arguments is not None and len(next_arguments) > 0:
return self._validate_argument_child(next_arguments[0], argument_type, next_arguments)
return True
elif argument_type.token == '' and (argument.startswith(argument_type.name) or argument in argument_type.aliases) and argument_type.value_token != '':
# new=
value = ''
if argument_type.value_token == ' ':
if next_arguments is None or len(next_arguments) == 0:
raise Exception(f'Invalid argument: {argument}')
value = next_arguments[0]
else:
value = argument.split(argument_type.value_token)[1]
self._set_variable(argument_type.name, value)
self._additional_arguments.append(argument_type.name)
if next_arguments is not None and len(next_arguments) > 0:
return self._validate_argument_child(next_arguments[0], argument_type, next_arguments[1:])
return True
else:
return False
def add_environment_variables(self, prefix: str):
for variable in ConfigurationVariableName.to_list():
var_name = f'{prefix}{variable}'
if var_name in [key.upper() for key in os.environ.keys()]:
self._set_variable(variable, os.environ[var_name])
2021-03-09 14:35:45 +01:00
def add_console_argument(self, argument: ConsoleArgument):
self._argument_types.append(argument)
2021-03-03 18:37:39 +01:00
def add_console_arguments(self):
for arg_name in ConfigurationVariableName.to_list():
2021-03-09 14:35:45 +01:00
self.add_console_argument(ConsoleArgument('--', arg_name, [], ''))
2021-03-09 14:35:45 +01:00
arg_list = sys.argv[1:]
for i in range(0, len(arg_list)):
argument = arg_list[i]
next_arguments = []
error_message = ''
if i+1 < len(arg_list):
next_arguments = arg_list[i+1:]
found = False
for argument_type in self._argument_types:
try:
found = self._validate_argument_by_argument_type(argument, argument_type, next_arguments)
if found:
return
except Exception as e:
error_message = e
print('\nerror')
if not found and error_message == '':
error_message = f'Invalid argument: {argument}'
if self._argument_error_function is not None:
self._argument_error_function(error_message)
else:
self._print_error(__name__, error_message)
exit()
2021-03-04 19:06:16 +01:00
def add_json_file(self, name: str, optional: bool = None, output: bool = True):
if self._hosting_environment.content_root_path.endswith('/') and not name.startswith('/'):
file_path = f'{self._hosting_environment.content_root_path}{name}'
else:
file_path = f'{self._hosting_environment.content_root_path}/{name}'
if not os.path.isfile(file_path):
if not optional:
2021-03-04 18:12:51 +01:00
if output:
self._print_error(__name__, f'File not found: {file_path}')
exit()
2021-03-04 18:12:51 +01:00
if output:
self._print_warn(__name__, f'Not Loaded config file: {file_path}')
return None
2021-03-04 18:12:51 +01:00
config_from_file = self._load_json_file(file_path, output)
2021-03-03 10:47:52 +01:00
for sub in ConfigurationModelABC.__subclasses__():
for key, value in config_from_file.items():
if sub.__name__ == key:
configuration = sub()
configuration.from_dict(value)
self.add_configuration(sub, configuration)
2021-03-04 18:12:51 +01:00
def _load_json_file(self, file: str, output: bool) -> dict:
try:
# open config file, create if not exists
with open(file, encoding='utf-8') as cfg:
# load json
json_cfg = json.load(cfg)
2021-03-04 18:12:51 +01:00
if output:
self._print_info(__name__, f'Loaded config file: {file}')
return json_cfg
except Exception as e:
self._print_error(__name__, f'Cannot load config file: {file}! -> {e}')
return {}
2021-03-03 10:47:52 +01:00
def add_configuration(self, key_type: type, value: ConfigurationModelABC):
2020-11-25 21:41:45 +01:00
self._config[key_type] = value
2021-03-09 14:35:45 +01:00
def get_configuration(self, search_type: Union[str, Type[ConfigurationModelABC]]) -> Union[
str, Callable[ConfigurationModelABC]]:
2020-11-25 21:41:45 +01:00
if search_type not in self._config:
raise Exception(f'Config model by type {search_type} not found')
for config_model in self._config:
if config_model == search_type:
return self._config[config_model]