All checks were successful
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Successful in 18s
Build on push / core (push) Successful in 24s
Build on push / dependency (push) Successful in 17s
Build on push / database (push) Successful in 15s
Build on push / translation (push) Successful in 15s
Build on push / mail (push) Successful in 18s
Build on push / application (push) Successful in 19s
Build on push / auth (push) Successful in 16s
121 lines
4.2 KiB
Python
121 lines
4.2 KiB
Python
from cpl.auth.permission.permissions import Permissions
|
|
from cpl.auth.permission.permissions_registry import PermissionsRegistry
|
|
from cpl.auth.schema import (
|
|
Permission,
|
|
Role,
|
|
RolePermission,
|
|
ApiKey,
|
|
ApiKeyPermission,
|
|
PermissionDao,
|
|
RoleDao,
|
|
RolePermissionDao,
|
|
ApiKeyDao,
|
|
ApiKeyPermissionDao,
|
|
)
|
|
from cpl.core.utils.get_value import get_value
|
|
from cpl.database.abc.data_seeder_abc import DataSeederABC
|
|
from cpl.database.db_logger import DBLogger
|
|
|
|
_logger = DBLogger(__name__)
|
|
|
|
|
|
class PermissionSeeder(DataSeederABC):
|
|
def __init__(
|
|
self,
|
|
permission_dao: PermissionDao,
|
|
role_dao: RoleDao,
|
|
role_permission_dao: RolePermissionDao,
|
|
api_key_dao: ApiKeyDao,
|
|
api_key_permission_dao: ApiKeyPermissionDao,
|
|
):
|
|
DataSeederABC.__init__(self)
|
|
self._permission_dao = permission_dao
|
|
self._role_dao = role_dao
|
|
self._role_permission_dao = role_permission_dao
|
|
self._api_key_dao = api_key_dao
|
|
self._api_key_permission_dao = api_key_permission_dao
|
|
|
|
async def seed(self):
|
|
permissions = await self._permission_dao.get_all()
|
|
possible_permissions = [permission for permission in PermissionsRegistry.get()]
|
|
|
|
if len(permissions) == len(possible_permissions):
|
|
_logger.info("Permissions already existing")
|
|
await self._update_missing_descriptions()
|
|
return
|
|
|
|
to_delete = []
|
|
for permission in permissions:
|
|
if permission.name in possible_permissions:
|
|
continue
|
|
|
|
to_delete.append(permission)
|
|
|
|
await self._permission_dao.delete_many(to_delete, hard_delete=True)
|
|
|
|
_logger.warning("Permissions incomplete")
|
|
permission_names = [permission.name for permission in permissions]
|
|
await self._permission_dao.create_many(
|
|
[
|
|
Permission(
|
|
0,
|
|
permission,
|
|
get_value(PermissionsRegistry.descriptions(), permission, str),
|
|
)
|
|
for permission in possible_permissions
|
|
if permission not in permission_names
|
|
]
|
|
)
|
|
await self._update_missing_descriptions()
|
|
|
|
await self._add_missing_to_role()
|
|
await self._add_missing_to_api_key()
|
|
|
|
async def _add_missing_to_role(self):
|
|
admin_role = await self._role_dao.find_single_by([{Role.id: 1}, {Role.name: "admin"}])
|
|
if admin_role is None:
|
|
return
|
|
|
|
admin_permissions = await self._role_permission_dao.get_by_role_id(admin_role.id, with_deleted=True)
|
|
to_assign = [
|
|
RolePermission(0, admin_role.id, permission.id)
|
|
for permission in await self._permission_dao.get_all()
|
|
if permission.id not in [x.permission_id for x in admin_permissions]
|
|
]
|
|
await self._role_permission_dao.create_many(to_assign)
|
|
|
|
async def _add_missing_to_api_key(self):
|
|
admin_api_key = await self._api_key_dao.find_single_by([{ApiKey.id: 1}, {ApiKey.identifier: "admin"}])
|
|
if admin_api_key is None:
|
|
return
|
|
|
|
admin_permissions = await self._api_key_permission_dao.find_by_api_key_id(admin_api_key.id, with_deleted=True)
|
|
to_assign = [
|
|
ApiKeyPermission(0, admin_api_key.id, permission.id)
|
|
for permission in await self._permission_dao.get_all()
|
|
if permission.id not in [x.permission_id for x in admin_permissions]
|
|
]
|
|
await self._api_key_permission_dao.create_many(to_assign)
|
|
|
|
async def _update_missing_descriptions(self):
|
|
permissions = {
|
|
permission.name: permission
|
|
for permission in await self._permission_dao.find_by([{Permission.description: None}])
|
|
}
|
|
to_update = []
|
|
|
|
if len(permissions) == 0:
|
|
return
|
|
|
|
for key in PermissionsRegistry.descriptions():
|
|
if key.value not in permissions:
|
|
continue
|
|
|
|
permissions[key.value].description = PermissionsRegistry.descriptions()[key]
|
|
to_update.append(permissions[key.value])
|
|
|
|
if len(to_update) == 0:
|
|
return
|
|
|
|
await self._permission_dao.update_many(to_update)
|