Keycloak role management
All checks were successful
Build dev on push / build-api (push) Successful in 32s
Build dev on push / build-redirector (push) Successful in 33s
Build dev on push / build-web (push) Successful in 56s

Closes #10
This commit is contained in:
Sven Heidemann 2025-01-11 14:30:54 +01:00
parent 854e00882b
commit cb3cd05cf2
3 changed files with 55 additions and 2 deletions

View File

@ -52,7 +52,11 @@ class RouteUserExtension:
if not user_id: if not user_id:
return None return None
return await userDao.find_by_keycloak_id(user_id) user = await userDao.find_by_keycloak_id(user_id)
if user is None:
return None
return user
@classmethod @classmethod
async def get_dev_user(cls) -> Optional[User]: async def get_dev_user(cls) -> Optional[User]:
@ -67,6 +71,50 @@ class RouteUserExtension:
user = await cls.get_dev_user() user = await cls.get_dev_user()
return user return user
@classmethod
def _flatten_groups(cls, groups):
flat_list = []
for group in groups:
flat_list.append(group)
if 'subGroups' in group and group['subGroups']:
flat_list.extend(cls._flatten_groups(group['subGroups']))
return flat_list
@classmethod
async def _map_keycloak_groups_with_roles(cls, user: User):
try:
roles = {x.name: x for x in await roleDao.get_all()}
groups = cls._flatten_groups(Keycloak.admin.get_groups(full_hierarchy=True))
groups_with_role = [x["name"] for x in groups if x["name"] in roles.keys()]
user_groups_with_role = [
x["name"] for x in Keycloak.admin.get_user_groups(user.keycloak_id) if x["name"] in roles.keys()
]
user_roles = set(x.name for x in await user.roles if x.name in groups_with_role)
missing_groups = set(user_groups_with_role) - set(user_roles)
missing_roles = set(user_roles) - set(user_groups_with_role)
if len(missing_groups) > 0:
await roleUserDao.create_many(
[
RoleUser(0, (await roleDao.get_by_name(group)).id, user.id)
for group in missing_groups
]
)
if len(missing_roles) > 0:
await roleUserDao.delete_many(
[
await roleUserDao.get_single_by([
{RoleUser.role_id: roles[role].id},
{RoleUser.user_id: user.id},
])
for role in missing_roles
]
)
except Exception as e:
logger.error("Failed to map user groups", e)
@classmethod @classmethod
async def _create_user(cls, kc_user: KeycloakUser): async def _create_user(cls, kc_user: KeycloakUser):
try: try:
@ -101,8 +149,11 @@ class RouteUserExtension:
user = await cls.get_user() user = await cls.get_user()
if user is None: if user is None:
await cls._create_user(KeycloakUser(user_info)) u_id = await cls._create_user(KeycloakUser(user_info))
await cls._map_keycloak_groups_with_roles(await userDao.get_by_id(u_id))
return True return True
else:
await cls._map_keycloak_groups_with_roles(user)
if user.deleted: if user.deleted:
return False return False

View File

@ -150,6 +150,7 @@
"update": "Bearbeiten" "update": "Bearbeiten"
}, },
"user": { "user": {
"assign_roles": "Rollen zuweisen",
"count_header": "Benutzer", "count_header": "Benutzer",
"email": "E-Mail", "email": "E-Mail",
"user": "Benutzer", "user": "Benutzer",

View File

@ -150,6 +150,7 @@
"update": "Update" "update": "Update"
}, },
"user": { "user": {
"assign_roles": "Assign roles",
"count_header": "User(s)", "count_header": "User(s)",
"email": "E-Mail", "email": "E-Mail",
"user": "User", "user": "User",