Improved argument parsing
This commit is contained in:
@@ -4,11 +4,15 @@ import sys
|
||||
from collections.abc import Callable
|
||||
from typing import Union, Type, Optional
|
||||
|
||||
from cpl_core.configuration.argument_abc import ArgumentABC
|
||||
from cpl_core.configuration.argument_builder import ArgumentBuilder
|
||||
from cpl_core.configuration.argument_type_enum import ArgumentTypeEnum
|
||||
from cpl_core.configuration.configuration_abc import ConfigurationABC
|
||||
from cpl_core.configuration.configuration_model_abc import ConfigurationModelABC
|
||||
from cpl_core.configuration.configuration_variable_name_enum import ConfigurationVariableNameEnum
|
||||
from cpl_core.configuration.console_argument import ConsoleArgument
|
||||
from cpl_core.configuration.runnable_argument_abc import RunnableArgumentABC
|
||||
from cpl_core.configuration.executable_argument import ExecutableArgument
|
||||
from cpl_core.configuration.flag_argument import FlagArgument
|
||||
from cpl_core.configuration.variable_argument import VariableArgument
|
||||
from cpl_core.console.console import Console
|
||||
from cpl_core.console.foreground_color_enum import ForegroundColorEnum
|
||||
from cpl_core.dependency_injection import ServiceProviderABC
|
||||
@@ -26,7 +30,7 @@ class Configuration(ConfigurationABC):
|
||||
self._application_environment = ApplicationEnvironment()
|
||||
self._config: dict[Union[type, str], Union[ConfigurationModelABC, str]] = {}
|
||||
|
||||
self._argument_types: list[ConsoleArgument] = []
|
||||
self._argument_types: list[ArgumentABC] = []
|
||||
self._additional_arguments: list[str] = []
|
||||
|
||||
self._argument_error_function: Optional[Callable] = None
|
||||
@@ -116,155 +120,6 @@ class Configuration(ConfigurationABC):
|
||||
else:
|
||||
self._config[name] = value
|
||||
|
||||
def _validate_argument_by_argument_type(self, argument: str, argument_type: ConsoleArgument,
|
||||
next_arguments: list[str] = None) -> bool:
|
||||
r"""Validate argument by argument type
|
||||
|
||||
Parameter
|
||||
---------
|
||||
argument: :class:`str`
|
||||
Command as string
|
||||
argument_type: :class:`cpl_core.configuration.console_argument.ConsoleArgument`
|
||||
Command type as ConsoleArgument
|
||||
next_arguments: list[:class:`str`]
|
||||
Following arguments of argument
|
||||
|
||||
Returns
|
||||
-------
|
||||
Object of :class:`bool`
|
||||
|
||||
Raises
|
||||
------
|
||||
Exception: An error occurred getting an argument for a command
|
||||
"""
|
||||
argument_name = ''
|
||||
value = ''
|
||||
result = False
|
||||
|
||||
if argument_type.value_token != '' and argument_type.value_token in argument:
|
||||
# ?new=value
|
||||
found = False
|
||||
for alias in argument_type.aliases:
|
||||
if alias in argument:
|
||||
found = True
|
||||
|
||||
if argument_type.name not in argument_name and not found:
|
||||
return False
|
||||
|
||||
if argument_type.is_value_token_optional is not None and argument_type.is_value_token_optional:
|
||||
if argument_type.name not in self._additional_arguments:
|
||||
self._additional_arguments.append(argument_type.name)
|
||||
result = True
|
||||
|
||||
if argument_type.token != '' and argument.startswith(argument_type.token):
|
||||
# --new=value
|
||||
if len(argument.split(argument_type.token)[1].split(argument_type.value_token)) == 0:
|
||||
raise Exception(f'Expected argument for command: {argument}')
|
||||
|
||||
argument_name = argument.split(argument_type.token)[1].split(argument_type.value_token)[0]
|
||||
else:
|
||||
# new=value
|
||||
argument_name = argument.split(argument_type.value_token)[1]
|
||||
|
||||
if argument_name == '':
|
||||
raise Exception(f'Expected argument for command: {argument_type.name}')
|
||||
|
||||
result = True
|
||||
|
||||
if argument_type.is_value_token_optional is True:
|
||||
is_valid = False
|
||||
|
||||
name_list = argument.split(argument_type.token)
|
||||
if len(name_list) > 1:
|
||||
value_list = name_list[1].split(argument_type.value_token)
|
||||
if len(value_list) > 1:
|
||||
is_valid = True
|
||||
value = argument.split(argument_type.token)[1].split(argument_type.value_token)[1]
|
||||
|
||||
if not is_valid:
|
||||
if argument_type.name not in self._additional_arguments:
|
||||
self._additional_arguments.append(argument_type.name)
|
||||
result = True
|
||||
else:
|
||||
value = argument.split(argument_type.token)[1].split(argument_type.value_token)[1]
|
||||
|
||||
if argument_name != argument_type.name and argument_name not in argument_type.aliases:
|
||||
return False
|
||||
|
||||
self._set_variable(argument_type.name, value)
|
||||
result = True
|
||||
|
||||
elif argument_type.value_token == ' ':
|
||||
# ?new value
|
||||
found = False
|
||||
for alias in argument_type.aliases:
|
||||
if alias == argument or f' {alias} ' == argument:
|
||||
found = True
|
||||
|
||||
if (argument_type.token != '' and f'{argument_type.token}{argument_type.name}' != argument
|
||||
or argument_type.name not in argument) and not found:
|
||||
return False
|
||||
|
||||
if (next_arguments is None or len(next_arguments) == 0) and \
|
||||
argument_type.is_value_token_optional is not True:
|
||||
raise Exception(f'Expected argument for command: {argument_type.name}')
|
||||
|
||||
if (next_arguments is None or len(next_arguments) == 0) and argument_type.is_value_token_optional is True:
|
||||
value = ''
|
||||
else:
|
||||
value = next_arguments[0]
|
||||
# next_arguments.remove(value)
|
||||
self._handled_args.append(value)
|
||||
|
||||
if argument_type.token != '' and argument.startswith(argument_type.token):
|
||||
# --new value
|
||||
argument_name = argument.split(argument_type.token)[1]
|
||||
else:
|
||||
# new value
|
||||
argument_name = argument
|
||||
|
||||
if argument_name != argument_type.name and argument_name not in argument_type.aliases:
|
||||
return False
|
||||
|
||||
if value == '':
|
||||
if argument_type.name not in self._additional_arguments:
|
||||
self._additional_arguments.append(argument_type.name)
|
||||
else:
|
||||
self._set_variable(argument_type.name, value)
|
||||
|
||||
result = True
|
||||
|
||||
elif argument_type.name == argument or argument in argument_type.aliases:
|
||||
# new
|
||||
self._additional_arguments.append(argument_type.name)
|
||||
result = True
|
||||
|
||||
if result:
|
||||
self._handled_args.append(argument)
|
||||
if next_arguments is not None and len(next_arguments) > 0:
|
||||
next_args = []
|
||||
if len(next_arguments) > 1:
|
||||
next_args = next_arguments[1:]
|
||||
|
||||
if argument_type.console_arguments is not None and len(argument_type.console_arguments) > 0:
|
||||
found_child = False
|
||||
for child_argument_type in argument_type.console_arguments:
|
||||
found_child = self._validate_argument_by_argument_type(
|
||||
next_arguments[0],
|
||||
child_argument_type,
|
||||
next_args
|
||||
)
|
||||
if found_child and child_argument_type.name not in self._additional_arguments:
|
||||
self._additional_arguments.append(child_argument_type.name)
|
||||
|
||||
if found_child:
|
||||
break
|
||||
|
||||
if not found_child:
|
||||
result = self._validate_argument_by_argument_type(next_arguments[0], argument_type, next_args)
|
||||
|
||||
return result
|
||||
|
||||
def _load_json_file(self, file: str, output: bool) -> dict:
|
||||
r"""Reads the json file
|
||||
|
||||
@@ -298,51 +153,65 @@ class Configuration(ConfigurationABC):
|
||||
if var_name in [key.upper() for key in os.environ.keys()]:
|
||||
self._set_variable(variable, os.environ[var_name])
|
||||
|
||||
def add_console_argument(self, argument: ConsoleArgument):
|
||||
def add_console_argument(self, argument: ArgumentABC):
|
||||
self._argument_types.append(argument)
|
||||
|
||||
def add_console_arguments(self, error: bool = None):
|
||||
for arg_name in ConfigurationVariableNameEnum.to_list():
|
||||
self.add_console_argument(ConsoleArgument('--', str(arg_name).upper(), [str(arg_name).lower()], '='))
|
||||
|
||||
arg_list = sys.argv[1:]
|
||||
def _parse_arguments(self, call_stack: list[Callable], arg_list: list[str], args_types: list[ArgumentABC]):
|
||||
for i in range(0, len(arg_list)):
|
||||
argument = arg_list[i]
|
||||
next_arguments = []
|
||||
error_message = ''
|
||||
arg_str = arg_list[i]
|
||||
Console.write_line('ARG_STR', arg_str)
|
||||
for arg in args_types:
|
||||
Console.write_line('ARG', arg.name, type(arg), len(arg.console_arguments))
|
||||
arg_str_without_token = arg_str
|
||||
if arg.token != "" and arg.token in arg_str:
|
||||
arg_str_without_token = arg_str.split(arg.token)[1]
|
||||
|
||||
if argument in self._handled_args:
|
||||
break
|
||||
# executable
|
||||
if isinstance(arg, ExecutableArgument):
|
||||
if arg_str.startswith(arg.token) \
|
||||
and arg_str_without_token == arg.name or arg_str_without_token in arg.aliases:
|
||||
Console.write_line('EXEC', arg.name)
|
||||
call_stack.append(arg.run)
|
||||
self._parse_arguments(call_stack, arg_list[i:], arg.console_arguments)
|
||||
|
||||
if i + 1 < len(arg_list):
|
||||
next_arguments = arg_list[i + 1:]
|
||||
# variables
|
||||
elif isinstance(arg, VariableArgument):
|
||||
arg_str_without_value = arg_str_without_token
|
||||
if arg.value_token in arg_str_without_value:
|
||||
arg_str_without_value = arg_str_without_token.split(arg.value_token)[0]
|
||||
|
||||
found = False
|
||||
for argument_type in self._argument_types:
|
||||
try:
|
||||
found = self._validate_argument_by_argument_type(argument, argument_type, next_arguments)
|
||||
if found:
|
||||
break
|
||||
except Exception as e:
|
||||
error_message = e
|
||||
if arg_str.startswith(arg.token) \
|
||||
and arg_str_without_value == arg.name or arg_str_without_value in arg.aliases:
|
||||
if arg.value_token != ' ':
|
||||
value = arg_str_without_token.split(arg.value_token)[1]
|
||||
else:
|
||||
value = arg_list[i + 1]
|
||||
self._set_variable(arg.name, value)
|
||||
self._parse_arguments(call_stack, arg_list[i + 1:], arg.console_arguments)
|
||||
# flags
|
||||
elif isinstance(arg, FlagArgument):
|
||||
if arg_str.startswith(arg.token) \
|
||||
and arg_str_without_token == arg.name or arg_str_without_token in arg.aliases:
|
||||
self._additional_arguments.append(arg.name)
|
||||
self._parse_arguments(call_stack, arg_list[i + 1:], arg.console_arguments)
|
||||
|
||||
if not found and error_message == '' and error is not False:
|
||||
error_message = f'Invalid argument: {argument}'
|
||||
Console.write_line()
|
||||
|
||||
if error_message != '':
|
||||
if self._argument_error_function is not None:
|
||||
self._argument_error_function(error_message)
|
||||
else:
|
||||
self._print_error(__name__, error_message)
|
||||
def parse_console_arguments(self, error: bool = None):
|
||||
# sets environment variables as possible arguments as: --VAR=VALUE
|
||||
for arg_name in ConfigurationVariableNameEnum.to_list():
|
||||
self.add_console_argument(VariableArgument('--', str(arg_name).upper(), [str(arg_name).lower()], '='))
|
||||
|
||||
sys.exit()
|
||||
Console.error('Parsing arguments:')
|
||||
arg_list = sys.argv[1:]
|
||||
call_stack = []
|
||||
self._parse_arguments(call_stack, arg_list, self._argument_types)
|
||||
|
||||
add_args = []
|
||||
for next_arg in next_arguments:
|
||||
if next_arg not in self._handled_args and next_arg not in self._additional_arguments:
|
||||
add_args.append(next_arg)
|
||||
Console.error('Parsing finished')
|
||||
|
||||
self._set_variable(f'{argument}AdditionalArguments', add_args)
|
||||
for call in call_stack:
|
||||
Console.write_line(call)
|
||||
call(self._additional_arguments)
|
||||
|
||||
def add_json_file(self, name: str, optional: bool = None, output: bool = True, path: str = None):
|
||||
if os.path.isabs(name):
|
||||
@@ -380,11 +249,10 @@ class Configuration(ConfigurationABC):
|
||||
def add_configuration(self, key_type: Union[str, type], value: ConfigurationModelABC):
|
||||
self._config[key_type] = value
|
||||
|
||||
def create_console_argument(self, token: str, name: str, aliases: list[str], value_token: str,
|
||||
is_value_token_optional: bool = None,
|
||||
runnable: Type[RunnableArgumentABC] = None) -> ConsoleArgument:
|
||||
argument = ConsoleArgument(token, name, aliases, value_token, is_value_token_optional, runnable)
|
||||
self.add_console_argument(argument)
|
||||
def create_console_argument(self, arg_type: ArgumentTypeEnum, token: str, name: str, aliases: list[str],
|
||||
*args, **kwargs) -> ArgumentABC:
|
||||
argument = ArgumentBuilder.build_argument(arg_type, token, name, aliases, *args, *kwargs)
|
||||
self._argument_types.append(argument)
|
||||
return argument
|
||||
|
||||
def get_configuration(self, search_type: Union[str, Type[ConfigurationModelABC]]) -> \
|
||||
@@ -408,4 +276,5 @@ class Configuration(ConfigurationABC):
|
||||
|
||||
def resolve_runnable_argument_types(self, services: ServiceProviderABC):
|
||||
for arg in self._argument_types:
|
||||
arg.set_runnable(services.get_service(arg.runnable_type))
|
||||
if isinstance(arg, ExecutableArgument):
|
||||
arg.set_executable(services.get_service(arg.executable_type))
|
||||
|
Reference in New Issue
Block a user