import datetime from abc import ABC, abstractmethod from enum import Enum from types import NoneType from typing import Generic, Optional, Union, Type, List, Any from cpl.core.typing import T, Id from cpl.core.utils.get_value import get_value from cpl.core.utils.string import String from cpl.database.abc.db_context_abc import DBContextABC from cpl.database.const import DATETIME_FORMAT from cpl.database.logger import DBLogger from cpl.database.external_data_temp_table_builder import ExternalDataTempTableBuilder from cpl.database.postgres.sql_select_builder import SQLSelectBuilder from cpl.database.typing import T_DBM, Attribute, AttributeFilters, AttributeSorts class DataAccessObjectABC(ABC, Generic[T_DBM]): @abstractmethod def __init__(self, model_type: Type[T_DBM], table_name: str): from cpl.dependency.service_provider_abc import ServiceProviderABC self._db = ServiceProviderABC.get_global_service(DBContextABC) self._logger = ServiceProviderABC.get_global_service(DBLogger) self._model_type = model_type self._table_name = table_name self._default_filter_condition = None self.__attributes: dict[str, type] = {} self.__db_names: dict[str, str] = {} self.__foreign_tables: dict[str, tuple[str, str]] = {} self.__foreign_table_keys: dict[str, str] = {} self.__foreign_dao: dict[str, "DataAccessObjectABC"] = {} self.__date_attributes: set[str] = set() self.__ignored_attributes: set[str] = set() self.__primary_key = "id" self.__primary_key_type = int self._external_fields: dict[str, ExternalDataTempTableBuilder] = {} @property def table_name(self) -> str: return self._table_name def has_attribute(self, attr_name: Attribute) -> bool: """ Check if the attribute exists in the DAO :param Attribute attr_name: Name of the attribute :return: True if the attribute exists, False otherwise """ return attr_name in self.__attributes def attribute( self, attr_name: Attribute, attr_type: type, db_name: str = None, ignore=False, primary_key=False, aliases: list[str] = None, ): """ Add an attribute for db and object mapping to the data access object :param Attribute attr_name: Name of the attribute in the object :param type attr_type: Python type of the attribute to cast db value to :param str db_name: Name of the field in the database, if None the attribute lowered attr_name without "_" is used :param bool ignore: Defines if field is ignored for create and update (for e.g. auto increment fields or created/updated fields) :param bool primary_key: Defines if field is the primary key :param list[str] aliases: List of aliases for the attribute name :return: """ if isinstance(attr_name, property): attr_name = attr_name.fget.__name__ self.__attributes[attr_name] = attr_type if ignore: self.__ignored_attributes.add(attr_name) if not db_name: db_name = attr_name.lower().replace("_", "") self.__db_names[attr_name] = db_name self.__db_names[db_name] = db_name if aliases is not None: for alias in aliases: if alias in self.__db_names: raise ValueError(f"Alias {alias} already exists") self.__db_names[alias] = db_name if primary_key: self.__primary_key = db_name self.__primary_key_type = attr_type if attr_type in [datetime, datetime.datetime]: self.__date_attributes.add(attr_name) self.__date_attributes.add(db_name) def reference( self, attr: Attribute, primary_attr: Attribute, foreign_attr: Attribute, table_name: str, reference_dao: "DataAccessObjectABC" = None, ): """ Add a reference to another table for the given attribute :param Attribute attr: Name of the attribute in the object :param str primary_attr: Name of the primary key in the foreign object :param str foreign_attr: Name of the foreign key in the object :param str table_name: Name of the table to reference :param DataAccessObjectABC reference_dao: The data access object for the referenced table :return: """ if isinstance(attr, property): attr = attr.fget.__name__ if isinstance(primary_attr, property): primary_attr = primary_attr.fget.__name__ primary_attr = primary_attr.lower().replace("_", "") if isinstance(foreign_attr, property): foreign_attr = foreign_attr.fget.__name__ foreign_attr = foreign_attr.lower().replace("_", "") self.__foreign_table_keys[attr] = foreign_attr if reference_dao is not None: self.__foreign_dao[attr] = reference_dao if table_name == self._table_name: return self.__foreign_tables[attr] = ( table_name, f"{table_name}.{primary_attr} = {self._table_name}.{foreign_attr}", ) def use_external_fields(self, builder: ExternalDataTempTableBuilder): self._external_fields[builder.table_name] = builder def to_object(self, result: dict) -> T_DBM: """ Convert a result from the database to an object :param dict result: Result from the database :return: """ value_map: dict[str, Any] = {} db_names = self.__db_names.items() for db_name, value in result.items(): # Find the attribute name corresponding to the db_name attr_name = next((k for k, v in db_names if v == db_name), None) if not attr_name: continue value_map[attr_name] = self._get_value_from_sql(self.__attributes[attr_name], value) return self._model_type(**value_map) def to_dict(self, obj: T_DBM) -> dict: """ Convert an object to a dictionary :param T_DBM obj: Object to convert :return: """ value_map: dict[str, Any] = {} for attr_name, attr_type in self.__attributes.items(): value = getattr(obj, attr_name) if isinstance(value, datetime.datetime): value = value.strftime(DATETIME_FORMAT) elif isinstance(value, Enum): value = value.value value_map[attr_name] = value for ex_fname in self._external_fields: ex_field = self._external_fields[ex_fname] for ex_attr in ex_field.fields: if ex_attr == self.__primary_key: continue value_map[ex_attr] = getattr(obj, ex_attr, None) return value_map async def count(self, filters: AttributeFilters = None) -> int: result = await self._prepare_query(filters=filters, for_count=True) return result[0]["count"] if result else 0 async def get_history( self, entry_id: int, by_key: str = None, when: datetime = None, until: datetime = None, without_deleted: bool = False, ) -> list[T_DBM]: """ Retrieve the history of an entry from the history table. :param entry_id: The ID of the entry to retrieve history for. :param by_key: The key to filter by (default is the primary key). :param when: A specific timestamp to filter the history. :param until: A timestamp to filter history entries up to a certain point. :param without_deleted: Exclude deleted entries if True. :return: A list of historical entries as objects. """ f_tables = list(self.__foreign_tables.keys()) history_table = f"{self._table_name}_history" builder = SQLSelectBuilder(history_table, self.__primary_key) builder.with_attribute("*") builder.with_value_condition( f"{history_table}.{by_key or self.__primary_key}", "=", str(entry_id), f_tables, ) if self._default_filter_condition: builder.with_condition(self._default_filter_condition, "", f_tables) if without_deleted: builder.with_value_condition(f"{history_table}.deleted", "=", "false", f_tables) if when: builder.with_value_condition( self._attr_from_date_to_char(f"{history_table}.updated"), "=", f"'{when.strftime(DATETIME_FORMAT)}'", f_tables, ) if until: builder.with_value_condition( self._attr_from_date_to_char(f"{history_table}.updated"), "<=", f"'{until.strftime(DATETIME_FORMAT)}'", f_tables, ) builder.with_order_by(f"{history_table}.updated", "DESC") query = await builder.build() result = await self._db.select_map(query) return [self.to_object(x) for x in result] if result else [] async def get_all(self) -> List[T_DBM]: result = await self._prepare_query(sorts=[{self.__primary_key: "asc"}]) return [self.to_object(x) for x in result] if result else [] async def get_by_id(self, id: Union[int, str]) -> Optional[T_DBM]: result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}]) return self.to_object(result[0]) if result else None async def find_by_id(self, id: Union[int, str]) -> Optional[T_DBM]: result = await self._prepare_query(filters=[{self.__primary_key: id}], sorts=[{self.__primary_key: "asc"}]) return self.to_object(result[0]) if result else None async def get_by( self, filters: AttributeFilters = None, sorts: AttributeSorts = None, take: int = None, skip: int = None, ) -> list[T_DBM]: result = await self._prepare_query(filters, sorts, take, skip) if not result or len(result) == 0: raise ValueError("No result found") return [self.to_object(x) for x in result] if result else [] async def get_single_by( self, filters: AttributeFilters = None, sorts: AttributeSorts = None, take: int = None, skip: int = None, ) -> T_DBM: result = await self._prepare_query(filters, sorts, take, skip) if not result: raise ValueError("No result found") if len(result) > 1: raise ValueError("More than one result found") return self.to_object(result[0]) async def find_by( self, filters: AttributeFilters = None, sorts: AttributeSorts = None, take: int = None, skip: int = None, ) -> list[T_DBM]: result = await self._prepare_query(filters, sorts, take, skip) return [self.to_object(x) for x in result] if result else [] async def find_single_by( self, filters: AttributeFilters = None, sorts: AttributeSorts = None, take: int = None, skip: int = None, ) -> Optional[T_DBM]: result = await self._prepare_query(filters, sorts, take, skip) if len(result) > 1: raise ValueError("More than one result found") return self.to_object(result[0]) if result else None async def touch(self, obj: T_DBM): """ Touch the entry to update the last updated date :return: """ await self._db.execute( f""" UPDATE {self._table_name} SET updated = NOW() WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)}; """ ) async def touch_many_by_id(self, ids: list[Id]): """ Touch the entries to update the last updated date :return: """ if len(ids) == 0: return await self._db.execute( f""" UPDATE {self._table_name} SET updated = NOW() WHERE {self.__primary_key} IN ({", ".join([str(x) for x in ids])}); """ ) async def _build_create_statement(self, obj: T_DBM, skip_editor=False) -> str: allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes] fields = ", ".join([self.__db_names[x] for x in allowed_fields]) fields = f"{'EditorId' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}" values = ", ".join([self._get_value_sql(getattr(obj, x)) for x in allowed_fields]) values = f"{await self._get_editor_id(obj) if not skip_editor else ''}{f', {values}' if not skip_editor and len(values) > 0 else f'{values}'}" return f""" INSERT INTO {self._table_name} ( {fields} ) VALUES ( {values} ) RETURNING {self.__primary_key}; """ async def create(self, obj: T_DBM, skip_editor=False) -> int: self._logger.debug(f"create {type(obj).__name__} {obj.__dict__}") result = await self._db.execute(await self._build_create_statement(obj, skip_editor)) return self._get_value_from_sql(self.__primary_key_type, result[0][0]) async def create_many(self, objs: list[T_DBM], skip_editor=False) -> list[int]: if len(objs) == 0: return [] self._logger.debug(f"create many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}") query = "" for obj in objs: query += await self._build_create_statement(obj, skip_editor) result = await self._db.execute(query) return [self._get_value_from_sql(self.__primary_key_type, x[0]) for x in result] async def _build_update_statement(self, obj: T_DBM, skip_editor=False) -> str: allowed_fields = [x for x in self.__attributes.keys() if x not in self.__ignored_attributes] fields = ", ".join( [f"{self.__db_names[x]} = {self._get_value_sql(getattr(obj, x, None))}" for x in allowed_fields] ) fields = f"{f'EditorId = {await self._get_editor_id(obj)}' if not skip_editor else ''}{f', {fields}' if not skip_editor and len(fields) > 0 else f'{fields}'}" return f""" UPDATE {self._table_name} SET {fields} WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)}; """ async def update(self, obj: T_DBM, skip_editor=False): self._logger.debug(f"update {type(obj).__name__} {obj.__dict__}") await self._db.execute(await self._build_update_statement(obj, skip_editor)) async def update_many(self, objs: list[T_DBM], skip_editor=False): if len(objs) == 0: return self._logger.debug(f"update many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}") query = "" for obj in objs: query += await self._build_update_statement(obj, skip_editor) await self._db.execute(query) async def _build_delete_statement(self, obj: T_DBM, hard_delete: bool = False) -> str: if hard_delete: return f""" DELETE FROM {self._table_name} WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)}; """ return f""" UPDATE {self._table_name} SET EditorId = {await self._get_editor_id(obj)}, Deleted = true WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)}; """ async def delete(self, obj: T_DBM, hard_delete: bool = False): self._logger.debug(f"delete {type(obj).__name__} {obj.__dict__}") await self._db.execute(await self._build_delete_statement(obj, hard_delete)) async def delete_many(self, objs: list[T_DBM], hard_delete: bool = False): if len(objs) == 0: return self._logger.debug(f"delete many {type(objs[0]).__name__} {len(objs)} {[x.__dict__ for x in objs]}") query = "" for obj in objs: query += await self._build_delete_statement(obj, hard_delete) await self._db.execute(query) async def _build_restore_statement(self, obj: T_DBM) -> str: return f""" UPDATE {self._table_name} SET EditorId = {await self._get_editor_id(obj)}, Deleted = false WHERE {self.__primary_key} = {self._get_primary_key_value_sql(obj)}; """ async def restore(self, obj: T_DBM): self._logger.debug(f"restore {type(obj).__name__} {obj.__dict__}") await self._db.execute(await self._build_restore_statement(obj)) async def restore_many(self, objs: list[T_DBM]): if len(objs) == 0: return self._logger.debug(f"restore many {type(objs[0]).__name__} {len(objs)} {objs[0].__dict__}") query = "" for obj in objs: query += await self._build_restore_statement(obj) await self._db.execute(query) async def _prepare_query( self, filters: AttributeFilters = None, sorts: AttributeSorts = None, take: int = None, skip: int = None, for_count=False, ) -> list[dict]: """ Prepares and executes a query using the SQLBuilder with the given parameters. :param filters: Conditions to filter the query. :param sorts: Sorting attributes and directions. :param take: Limit the number of results. :param skip: Offset the results. :return: Query result as a list of dictionaries. """ external_table_deps = [] builder = SQLSelectBuilder(self._table_name, self.__primary_key) for temp in self._external_fields: builder.with_temp_table(self._external_fields[temp]) if for_count: builder.with_attribute("COUNT(*) as count", ignore_table_name=True) else: builder.with_attribute("*") for attr in self.__foreign_tables: table, join_condition = self.__foreign_tables[attr] builder.with_left_join(table, join_condition) if filters: await self._build_conditions(builder, filters, external_table_deps) if sorts: self._build_sorts(builder, sorts, external_table_deps) if take: builder.with_limit(take) if skip: builder.with_offset(skip) for external_table in external_table_deps: builder.use_temp_table(external_table) query = await builder.build() return await self._db.select_map(query) async def _build_conditions( self, builder: SQLSelectBuilder, filters: AttributeFilters, external_table_deps: list[str], ): """ Builds SQL conditions from GraphQL-like filters and adds them to the SQLBuilder. :param builder: The SQLBuilder instance to add conditions to. :param filters: GraphQL-like filter structure. :param external_table_deps: List to store external table dependencies. """ if not isinstance(filters, list): filters = [filters] for filter_group in filters: sql_conditions = self._graphql_to_sql_conditions(filter_group, external_table_deps) for attr, operator, value in sql_conditions: if attr in self.__foreign_table_keys: attr = self.__foreign_table_keys[attr] recursive_join = self._get_recursive_reference_join(attr) if recursive_join is not None: builder.with_left_join(*recursive_join) external_table = self._get_external_field_key(attr) if external_table is not None: external_table_deps.append(external_table) if operator == "fuzzy": builder.with_levenshtein_condition(attr) elif operator in [ "IS NULL", "IS NOT NULL", ]: # operator without value builder.with_condition( attr, operator, [ x[0] for fdao in self.__foreign_dao for x in self.__foreign_dao[fdao].__foreign_tables.values() ], ) else: if attr in self.__date_attributes or String.to_snake_case(attr) in self.__date_attributes: attr = self._attr_from_date_to_char(f"{self._table_name}.{attr}") builder.with_value_condition( attr, operator, self._get_value_sql(value), [ x[0] for fdao in self.__foreign_dao for x in self.__foreign_dao[fdao].__foreign_tables.values() ], ) def _graphql_to_sql_conditions( self, graphql_structure: dict, external_table_deps: list[str] ) -> list[tuple[str, str, Any]]: """ Converts a GraphQL-like structure to SQL conditions. :param graphql_structure: The GraphQL-like filter structure. :param external_table_deps: List to track external table dependencies. :return: A list of tuples (attribute, operator, value). """ operators = { "equal": "=", "notEqual": "!=", "greater": ">", "greaterOrEqual": ">=", "less": "<", "lessOrEqual": "<=", "isNull": "IS NULL", "isNotNull": "IS NOT NULL", "contains": "LIKE", # Special handling in _graphql_to_sql_conditions "notContains": "NOT LIKE", # Special handling in _graphql_to_sql_conditions "startsWith": "LIKE", # Special handling in _graphql_to_sql_conditions "endsWith": "LIKE", # Special handling in _graphql_to_sql_conditions "in": "IN", "notIn": "NOT IN", } conditions = [] def parse_node(node, parent_key=None, parent_dao=None): if not isinstance(node, dict): return if isinstance(node, list): conditions.append((parent_key, "IN", node)) return for key, value in node.items(): if isinstance(key, property): key = key.fget.__name__ external_fields_table_name_by_parent = self._get_external_field_key(parent_key) external_fields_table_name = self._get_external_field_key(key) external_field = ( external_fields_table_name if external_fields_table_name_by_parent is None else external_fields_table_name_by_parent ) if key == "fuzzy": self._handle_fuzzy_filter_conditions(conditions, external_table_deps, value) elif parent_dao is not None and key in parent_dao.__db_names: parse_node(value, f"{parent_dao.table_name}.{key}") continue elif external_field is not None: external_table_deps.append(external_field) parse_node(value, f"{external_field}.{key}") elif parent_key in self.__foreign_table_keys: if key in operators: parse_node({key: value}, self.__foreign_table_keys[parent_key]) continue if parent_key in self.__foreign_dao: foreign_dao = self.__foreign_dao[parent_key] if key in foreign_dao.__foreign_tables: parse_node( value, f"{self.__foreign_tables[parent_key][0]}.{foreign_dao.__foreign_table_keys[key]}", foreign_dao.__foreign_dao[key], ) continue if parent_key in self.__foreign_tables: parse_node(value, f"{self.__foreign_tables[parent_key][0]}.{key}") continue parse_node({parent_key: value}) elif key in operators: operator = operators[key] if key == "contains" or key == "notContains": value = f"%{value}%" elif key == "in" or key == "notIn": value = value elif key == "startsWith": value = f"{value}%" elif key == "endsWith": value = f"%{value}" elif key == "isNull" or key == "isNotNull": is_null_value = value.get("equal", None) if isinstance(value, dict) else value if is_null_value is None: operator = operators[key] elif (key == "isNull" and is_null_value) or (key == "isNotNull" and not is_null_value): operator = "IS NULL" else: operator = "IS NOT NULL" conditions.append((parent_key, operator, None)) elif (key == "equal" or key == "notEqual") and value is None: operator = operators["isNull"] conditions.append((parent_key, operator, value)) elif isinstance(value, dict): if key in self.__foreign_table_keys: parse_node(value, key) elif key in self.__db_names and parent_key is not None: parse_node({f"{parent_key}": value}) elif key in self.__db_names: parse_node(value, self.__db_names[key]) else: parse_node(value, key) elif value is None: conditions.append((self.__db_names[key], "IS NULL", value)) else: conditions.append((self.__db_names[key], "=", value)) parse_node(graphql_structure) return conditions def _handle_fuzzy_filter_conditions(self, conditions, external_field_table_deps, sub_values): # Extract fuzzy filter parameters fuzzy_fields = get_value(sub_values, "fields", list[str]) fuzzy_term = get_value(sub_values, "term", str) fuzzy_threshold = get_value(sub_values, "threshold", int, 5) if not fuzzy_fields or not fuzzy_term: raise ValueError("Fuzzy filter must include 'fields' and 'term'.") fuzzy_fields_db_names = [] # Map fields to their database names for fuzzy_field in fuzzy_fields: external_fields_table_name = self._get_external_field_key(fuzzy_field) if external_fields_table_name is not None: external_fields_table = self._external_fields[external_fields_table_name] fuzzy_fields_db_names.append(f"{external_fields_table.table_name}.{fuzzy_field}") external_field_table_deps.append(external_fields_table.table_name) elif fuzzy_field in self.__db_names: fuzzy_fields_db_names.append(f"{self._table_name}.{self.__db_names[fuzzy_field]}") elif fuzzy_field in self.__foreign_tables: fuzzy_fields_db_names.append(f"{self._table_name}.{self.__foreign_table_keys[fuzzy_field]}") else: fuzzy_fields_db_names.append(self.__db_names[String.to_snake_case(fuzzy_field)][0]) # Build fuzzy conditions for each field fuzzy_conditions = self._build_fuzzy_conditions(fuzzy_fields_db_names, fuzzy_term, fuzzy_threshold) # Combine conditions with OR and append to the main conditions conditions.append((f"({' OR '.join(fuzzy_conditions)})", "fuzzy", None)) @staticmethod def _build_fuzzy_conditions(fields: list[str], term: str, threshold: int = 10) -> list[str]: conditions = [] for field in fields: conditions.append(f"levenshtein({field}::TEXT, '{term}') <= {threshold}") # Adjust the threshold as needed return conditions def _get_external_field_key(self, field_name: str) -> Optional[str]: """ Returns the key to get the external field if found, otherwise None. :param str field_name: The name of the field to search for. :return: The key if found, otherwise None. :rtype: Optional[str] """ if field_name is None: return None for key, builder in self._external_fields.items(): if field_name in builder.fields and field_name not in self.__db_names: return key return None def _get_recursive_reference_join(self, attr: str) -> Optional[tuple[str, str]]: parts = attr.split(".") table_name = ".".join(parts[:-1]) if table_name == self._table_name or table_name == "": return None all_foreign_tables = { x[0]: x[1] for x in [ *[x for x in self.__foreign_tables.values() if x[0] != self._table_name], *[x for fdao in self.__foreign_dao for x in self.__foreign_dao[fdao].__foreign_tables.values()], ] } if not table_name in all_foreign_tables: return None return table_name, all_foreign_tables[table_name] def _build_sorts( self, builder: SQLSelectBuilder, sorts: AttributeSorts, external_table_deps: list[str], ): """ Resolves complex sorting structures into SQL-compatible sorting conditions. Tracks external table dependencies. :param builder: The SQLBuilder instance to add sorting to. :param sorts: Sorting attributes and directions in a complex structure. :param external_table_deps: List to track external table dependencies. """ def parse_sort_node(node, parent_key=None): if isinstance(node, dict): for key, value in node.items(): if isinstance(value, dict): # Recursively parse nested structures parse_sort_node(value, key) elif isinstance(value, str) and value.lower() in ["asc", "desc"]: external_table = self._get_external_field_key(key) if external_table: external_table_deps.append(external_table) key = f"{external_table}.{key}" if parent_key in self.__foreign_tables: key = f"{self.__foreign_tables[parent_key][0]}.{key}" builder.with_order_by(key, value.upper()) else: raise ValueError(f"Invalid sort direction: {value}") elif isinstance(node, list): for item in node: parse_sort_node(item) else: raise ValueError(f"Invalid sort structure: {node}") parse_sort_node(sorts) def _get_value_sql(self, value: Any) -> str: if isinstance(value, str): if value.lower() == "null": return "NULL" return f"'{value}'" if isinstance(value, NoneType): return "NULL" if value is None: return "NULL" if isinstance(value, Enum): return f"'{value.value}'" if isinstance(value, bool): return "true" if value else "false" if isinstance(value, list): if len(value) == 0: return "()" return f"({', '.join([self._get_value_sql(x) for x in value])})" if isinstance(value, datetime.datetime): if value.tzinfo is None: value = value.replace(tzinfo=datetime.timezone.utc) return f"'{value.strftime(DATETIME_FORMAT)}'" return str(value) @staticmethod def _get_value_from_sql(cast_type: type, value: Any) -> Optional[T]: """ Get the value from the query result and cast it to the correct type :param type cast_type: :param Any value: :return Optional[T]: Casted value, when value is str "NULL" None is returned """ if isinstance(value, str) and "NULL" in value: return None if isinstance(value, NoneType): return None if isinstance(value, cast_type): return value return cast_type(value) def _get_primary_key_value_sql(self, obj: T_DBM) -> str: value = getattr(obj, self.__primary_key) if isinstance(value, str): return f"'{value}'" return value @staticmethod def _attr_from_date_to_char(attr: str) -> str: return f"TO_CHAR({attr}, 'YYYY-MM-DD HH24:MI:SS.US TZ')" @staticmethod async def _get_editor_id(obj: T_DBM): editor_id = obj.editor_id if editor_id is None: from cpl.core.ctx.user_context import get_user user = get_user() if user is not None: editor_id = user.id return editor_id if editor_id is not None else "NULL"