Added/Fixed api_key/user/role gql #181
Some checks failed
Test before pr merge / test-lint (pull_request) Failing after 6s

This commit is contained in:
2025-09-29 21:00:24 +02:00
parent 262e26cb83
commit e362b7fb61
25 changed files with 311 additions and 44 deletions

View File

@@ -1,14 +1,14 @@
from datetime import datetime
from typing import Optional, Self
from typing import Self
from async_property import async_property
from cpl.core.typing import SerialId
from cpl.database.abc import DbModelABC
from cpl.dependency import ServiceProvider, get_provider
from cpl.database.abc import DbJoinModelABC
from cpl.dependency import get_provider
class RolePermission(DbModelABC[Self]):
class RolePermission(DbJoinModelABC[Self]):
def __init__(
self,
id: SerialId,
@@ -19,28 +19,26 @@ class RolePermission(DbModelABC[Self]):
created: datetime | None = None,
updated: datetime | None = None,
):
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
self._role_id = role_id
self._permission_id = permission_id
DbJoinModelABC.__init__(self, id, role_id, permission_id, deleted, editor_id, created, updated)
@property
def role_id(self) -> int:
return self._role_id
return self._source_id
@async_property
async def role(self):
from cpl.auth.schema._permission.role_dao import RoleDao
role_dao: RoleDao = get_provider().get_service(RoleDao)
return await role_dao.get_by_id(self._role_id)
return await role_dao.get_by_id(self._source_id)
@property
def permission_id(self) -> int:
return self._permission_id
return self._foreign_id
@async_property
async def permission(self):
from cpl.auth.schema._permission.permission_dao import PermissionDao
permission_dao: PermissionDao = get_provider().get_service(PermissionDao)
return await permission_dao.get_by_id(self._permission_id)
return await permission_dao.get_by_id(self._foreign_id)

View File

@@ -2,10 +2,11 @@ import functools
import inspect
import types
from abc import ABC
from asyncio import iscoroutinefunction, iscoroutine
from asyncio import iscoroutinefunction
from typing import Callable, Type, Any, Optional
import strawberry
from async_property.base import AsyncPropertyDescriptor
from strawberry.exceptions import StrawberryException
from cpl.api import Unauthorized, Forbidden
@@ -169,6 +170,15 @@ class QueryABC(StrawberryProtocol, ABC):
except StrawberryException as e:
raise Exception(f"Error converting field '{f.name}' to strawberry field: {e}") from e
@staticmethod
def _type_to_strawberry(t: Type) -> Type:
_t = get_provider().get_service(t)
if isinstance(_t, StrawberryProtocol):
return _t.to_strawberry()
return t
def to_strawberry(self) -> Type:
cls = self.__class__
if TypeCollector.has(cls):
@@ -183,22 +193,35 @@ class QueryABC(StrawberryProtocol, ABC):
for name, f in self._fields.items():
t = f.type
if isinstance(name, property):
name = name.fget.__name__
if isinstance(name, AsyncPropertyDescriptor):
name = name.field_name
if isinstance(t, types.GenericAlias):
t = t.__args__[0]
if callable(t) and not isinstance(t, type):
_t = get_provider().get_service(t())
if isinstance(_t, StrawberryProtocol):
t = _t.to_strawberry()
else:
t = _t
t = self._type_to_strawberry(t())
elif issubclass(t, StrawberryProtocol):
t = self._type_to_strawberry(t)
annotations[name] = t if not f.optional else Optional[t]
namespace[name] = self._field_to_strawberry(f)
namespace["__annotations__"] = annotations
for k, v in namespace.items():
if isinstance(k, property):
k = k.fget.__name__
if isinstance(k, AsyncPropertyDescriptor):
k = k.field_name
setattr(gql_cls, k, v)
gql_cls.__annotations__ = annotations
gql_type = strawberry.type(gql_cls)
try:
gql_cls.__annotations__ = annotations
gql_type = strawberry.type(gql_cls)
except Exception as e:
raise Exception(f"Error creating strawberry type for '{cls.__name__}': {e}") from e
TypeCollector.set(cls, gql_type)
return gql_type

View File

@@ -4,16 +4,13 @@ from typing import Self
from cpl.api.application import WebApp
from cpl.api.model.validation_match import ValidationMatch
from cpl.auth.schema import UserDao
from cpl.core.configuration import Configuration
from cpl.application.abc.application_abc import __not_implemented__
from cpl.core.environment import Environment
from cpl.dependency.service_provider import ServiceProvider
from cpl.dependency.typing import Modules
from queries.user import UserGraphType, UserFilter, UserSort
from cpl.graphql._endpoints.graphiql import graphiql_endpoint
from cpl.graphql._endpoints.graphql import graphql_endpoint
from cpl.graphql._endpoints.playground import playground_endpoint
from cpl.graphql.auth.administration.user.user_mutation import UserMutation
from cpl.graphql.graphql_module import GraphQLModule
from cpl.graphql.service.schema import Schema
@@ -91,18 +88,20 @@ class GraphQLApp(WebApp):
return self
def with_auth_root_queries(self, public: bool = False):
if not Configuration.get("GraphQLAuthModuleEnabled", False):
raise Exception("GraphQLAuthModule is not loaded yet. Make sure to run 'add_module(GraphQLAuthModule)'")
try:
from cpl.graphql.auth.graphql_auth_module import GraphQLAuthModule
schema = self._services.get_service(Schema)
schema.query.dao_collection_field(UserGraphType, UserDao, "users", UserFilter, UserSort).with_public(public)
GraphQLAuthModule.with_auth_root_queries(self._services, public=public)
except ImportError:
__not_implemented__("cpl-auth & cpl-graphql", self.with_auth_root_mutations)
def with_auth_root_mutations(self, public: bool = False):
if not Configuration.get("GraphQLAuthModuleEnabled", False):
raise Exception("GraphQLAuthModule is not loaded yet. Make sure to run 'add_module(GraphQLAuthModule)'")
try:
from cpl.graphql.auth.graphql_auth_module import GraphQLAuthModule
schema = self._services.get_service(Schema)
schema.mutation.with_mutation("user", UserMutation).with_public(public)
GraphQLAuthModule.with_auth_root_mutations(self._services, public=public)
except ImportError:
__not_implemented__("cpl-auth & cpl-graphql", self.with_auth_root_mutations)
async def _log_before_startup(self):
host = self._api_settings.host

View File

@@ -2,7 +2,7 @@ from cpl.auth.schema import ApiKey, RolePermissionDao
from cpl.graphql.schema.db_model_graph_type import DbModelGraphType
class ApiKeyGraphType(DbModelGraphType):
class ApiKeyGraphType(DbModelGraphType[ApiKey]):
def __init__(self, role_permission_dao: RolePermissionDao):
DbModelGraphType.__init__(self)

View File

@@ -2,7 +2,7 @@ from cpl.api import APILogger
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.permission import Permissions
from cpl.auth.schema import ApiKey, ApiKeyDao, ApiKeyPermissionDao, ApiKeyPermission
from cpl.graphql.auth.administration.api_key.api_key_input import ApiKeyUpdateInput, ApiKeyCreateInput
from cpl.graphql.auth.api_key.api_key_input import ApiKeyUpdateInput, ApiKeyCreateInput
from cpl.graphql.schema.mutation import Mutation

View File

@@ -0,0 +1,9 @@
from cpl.auth.schema import ApiKey
from cpl.graphql.schema.sort.db_model_sort import DbModelSort
from cpl.graphql.schema.sort.sort_order import SortOrder
class ApiKeySort(DbModelSort[ApiKey]):
def __init__(self):
DbModelSort.__init__(self)
self.field("identifier", SortOrder)

View File

@@ -1,20 +1,40 @@
from cpl.auth.schema import UserDao, ApiKeyDao, RoleDao
from cpl.core.configuration import Configuration
from cpl.dependency import ServiceProvider
from cpl.dependency.module.module import Module
from cpl.dependency.service_collection import ServiceCollection
from cpl.graphql.auth.administration.api_key.api_key_filter import ApiKeyFilter
from cpl.graphql.auth.administration.api_key.api_key_graph_type import ApiKeyGraphType
from cpl.graphql.auth.administration.api_key.api_key_mutation import ApiKeyMutation
from cpl.graphql.auth.administration.user.user_filter import UserFilter
from cpl.graphql.auth.administration.user.user_graph_type import UserGraphType
from cpl.graphql.auth.administration.user.user_mutation import UserMutation
from cpl.graphql.auth.api_key.api_key_filter import ApiKeyFilter
from cpl.graphql.auth.api_key.api_key_graph_type import ApiKeyGraphType
from cpl.graphql.auth.api_key.api_key_mutation import ApiKeyMutation
from cpl.graphql.auth.api_key.api_key_sort import ApiKeySort
from cpl.graphql.auth.role.role_filter import RoleFilter
from cpl.graphql.auth.role.role_graph_type import RoleGraphType
from cpl.graphql.auth.role.role_mutation import RoleMutation
from cpl.graphql.auth.role.role_sort import RoleSort
from cpl.graphql.auth.user.user_filter import UserFilter
from cpl.graphql.auth.user.user_graph_type import UserGraphType
from cpl.graphql.auth.user.user_mutation import UserMutation
from cpl.graphql.auth.user.user_sort import UserSort
from cpl.graphql.graphql_module import GraphQLModule
from cpl.graphql.service.schema import Schema
class GraphQLAuthModule(Module):
dependencies = [GraphQLModule]
transient = [UserGraphType, UserMutation, UserFilter, ApiKeyGraphType, ApiKeyMutation, ApiKeyFilter]
transient = [
UserGraphType,
UserMutation,
UserFilter,
UserSort,
ApiKeyGraphType,
ApiKeyMutation,
ApiKeyFilter,
ApiKeySort,
RoleGraphType,
RoleMutation,
RoleFilter,
RoleSort,
]
@staticmethod
def register(collection: ServiceCollection):
@@ -24,3 +44,27 @@ class GraphQLAuthModule(Module):
def configure(provider: ServiceProvider):
schema = provider.get_service(Schema)
schema.with_type(UserGraphType)
schema.with_type(ApiKeyGraphType)
schema.with_type(RoleGraphType)
@staticmethod
def with_auth_root_queries(provider: ServiceProvider, public: bool = False):
if not Configuration.get("GraphQLAuthModuleEnabled", False):
raise Exception("GraphQLAuthModule is not loaded yet. Make sure to run 'add_module(GraphQLAuthModule)'")
schema = provider.get_service(Schema)
schema.query.dao_collection_field(UserGraphType, UserDao, "users", UserFilter, UserSort).with_public(public)
schema.query.dao_collection_field(ApiKeyGraphType, ApiKeyDao, "apiKeys", ApiKeyFilter, ApiKeySort).with_public(
public
)
schema.query.dao_collection_field(RoleGraphType, RoleDao, "roles", RoleFilter, RoleSort).with_public(public)
@staticmethod
def with_auth_root_mutations(provider: ServiceProvider, public: bool = False):
if not Configuration.get("GraphQLAuthModuleEnabled", False):
raise Exception("GraphQLAuthModule is not loaded yet. Make sure to run 'add_module(GraphQLAuthModule)'")
schema = provider.get_service(Schema)
schema.mutation.with_mutation("user", UserMutation).with_public(public)
schema.mutation.with_mutation("apiKey", ApiKeyMutation).with_public(public)
schema.mutation.with_mutation("role", RoleMutation).with_public(public)

View File

@@ -0,0 +1,11 @@
from cpl.auth.schema import User, Role
from cpl.graphql.schema.filter.db_model_filter import DbModelFilter
from cpl.graphql.schema.filter.string_filter import StringFilter
class RoleFilter(DbModelFilter[Role]):
def __init__(self, public: bool = False):
DbModelFilter.__init__(self, public)
self.field("name", StringFilter).with_public(public)
self.field("description", StringFilter).with_public(public)

View File

@@ -0,0 +1,14 @@
from cpl.auth.schema import Role
from cpl.graphql.auth.user.user_graph_type import UserGraphType
from cpl.graphql.schema.db_model_graph_type import DbModelGraphType
class RoleGraphType(DbModelGraphType[Role]):
def __init__(self, public: bool = False):
DbModelGraphType.__init__(self)
self.string_field("name", lambda root: root.name).with_public(public)
self.string_field("description", lambda root: root.description).with_public(public)
self.list_field("permissions", str, lambda root: root.permissions).with_public(public)
self.list_field("users", UserGraphType, lambda root: root.users).with_public(public)

View File

@@ -0,0 +1,29 @@
from cpl.auth.schema import User, Role
from cpl.core.typing import SerialId
from cpl.graphql.schema.input import Input
class RoleCreateInput(Input[Role]):
name: str
description: str | None
permissions: list[SerialId] | None
def __init__(self):
Input.__init__(self)
self.string_field("name").with_required()
self.string_field("description")
self.list_field("permissions", SerialId)
class RoleUpdateInput(Input[Role]):
id: SerialId
name: str | None
description: str | None
permissions: list[SerialId] | None
def __init__(self):
Input.__init__(self)
self.int_field("id").with_required()
self.string_field("name")
self.string_field("description")
self.list_field("permissions", SerialId)

View File

@@ -0,0 +1,101 @@
from cpl.api import APILogger
from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.permission import Permissions
from cpl.auth.schema import RoleDao, Role, RolePermissionDao, RolePermission
from cpl.graphql.auth.role.role_input import RoleCreateInput, RoleUpdateInput
from cpl.graphql.schema.mutation import Mutation
class RoleMutation(Mutation):
def __init__(
self,
logger: APILogger,
role_dao: RoleDao,
role_permission_dao: RolePermissionDao,
permission_dao: RolePermissionDao,
keycloak_admin: KeycloakAdmin,
):
Mutation.__init__(self)
self._logger = logger
self._role_dao = role_dao
self._role_permission_dao = role_permission_dao
self._permission_dao = permission_dao
self._keycloak_admin = keycloak_admin
self.int_field(
"create",
self.resolve_create,
).with_require_any_permission(Permissions.roles_create).with_argument(
"input",
RoleCreateInput,
).with_required()
self.bool_field(
"update",
self.resolve_update,
).with_require_any_permission(Permissions.roles_update).with_argument(
"input",
RoleUpdateInput,
).with_required()
self.bool_field(
"delete",
self.resolve_delete,
).with_require_any_permission(Permissions.roles_delete).with_argument(
"id",
int,
).with_required()
self.bool_field(
"restore",
self.resolve_restore,
).with_require_any_permission(Permissions.roles_delete).with_argument(
"id",
int,
).with_required()
async def resolve_create(self, input: RoleCreateInput, *_):
self._logger.debug(f"create role: {input.__dict__}")
role = Role(
0,
input.name,
input.description,
)
await self._role_dao.create(role)
role = await self._role_dao.get_by_name(role.name)
await self._role_permission_dao.create_many([RolePermission(0, role.id, x) for x in input.permissions])
return role
async def resolve_update(self, input: RoleUpdateInput, *_):
self._logger.debug(f"update role: {input.__dict__}")
role = await self._role_dao.get_by_id(input.id)
role.name = input.get("name", role.name)
role.description = input.get("description", role.description)
await self._role_dao.update(role)
await self._resolve_assignments(
input.get("permissions", []),
role,
RolePermission.role_id,
RolePermission.permission_id,
self._role_dao,
self._role_permission_dao,
RolePermission,
self._permission_dao,
)
return role
async def resolve_delete(self, id: int):
self._logger.debug(f"delete role: {id}")
role = await self._role_dao.get_by_id(id)
await self._role_dao.delete(role)
return True
async def resolve_restore(self, id: int):
self._logger.debug(f"restore role: {id}")
role = await self._role_dao.get_by_id(id)
await self._role_dao.restore(role)
return True

View File

@@ -0,0 +1,10 @@
from cpl.auth.schema import Role
from cpl.graphql.schema.sort.db_model_sort import DbModelSort
from cpl.graphql.schema.sort.sort_order import SortOrder
class RoleSort(DbModelSort[Role]):
def __init__(self):
DbModelSort.__init__(self)
self.field("name", SortOrder)
self.field("description", SortOrder)

View File

@@ -2,7 +2,7 @@ from cpl.auth.schema import User
from cpl.graphql.schema.db_model_graph_type import DbModelGraphType
class UserGraphType(DbModelGraphType):
class UserGraphType(DbModelGraphType[User]):
def __init__(self):
DbModelGraphType.__init__(self)

View File

@@ -3,7 +3,7 @@ from cpl.auth.keycloak import KeycloakAdmin
from cpl.auth.permission import Permissions
from cpl.auth.schema import UserDao, User, RoleUser, RoleUserDao, RoleDao
from cpl.core.ctx.user_context import get_user
from cpl.graphql.auth.administration.user.user_input import UserCreateInput, UserUpdateInput
from cpl.graphql.auth.user.user_input import UserCreateInput, UserUpdateInput
from cpl.graphql.schema.mutation import Mutation

View File

@@ -0,0 +1,10 @@
from cpl.auth.schema import User
from cpl.graphql.schema.sort.db_model_sort import DbModelSort
from cpl.graphql.schema.sort.sort_order import SortOrder
class UserSort(DbModelSort[User]):
def __init__(self):
DbModelSort.__init__(self)
self.field("username", SortOrder)
self.field("email", SortOrder)

View File

@@ -25,7 +25,7 @@ class DbModelGraphType(GraphType[T], Generic[T]):
self.bool_field("deleted", lambda root: root.deleted).with_public(public)
if Configuration.get("GraphQLAuthModuleEnabled", False):
from cpl.graphql.auth.administration.user.user_graph_type import UserGraphType
from cpl.graphql.auth.user.user_graph_type import UserGraphType
self.object_field("editor", lambda: UserGraphType, lambda root: root.editor).with_public(public)
self.string_field("created", lambda root: root.created).with_public(public)

View File

@@ -15,7 +15,7 @@ class DbModelFilter(Filter[T], Generic[T]):
self.field("id", IntFilter).with_public(public)
self.field("deleted", BoolFilter).with_public(public)
if Configuration.get("GraphQLAuthModuleEnabled", False):
from cpl.graphql.auth.administration.user.user_filter import UserFilter
from cpl.graphql.auth.user.user_filter import UserFilter
self.field("editor", lambda: UserFilter).with_public(public)
self.field("created", DateFilter).with_public(public)

View File

@@ -0,0 +1,19 @@
from typing import Generic
from cpl.core.configuration import Configuration
from cpl.core.typing import T
from cpl.graphql.schema.sort.sort import Sort
from cpl.graphql.schema.sort.sort_order import SortOrder
class DbModelSort(Sort[T], Generic[T]):
def __init__(
self,
):
Sort.__init__(self)
self.field("id", SortOrder)
self.field("deleted", SortOrder)
if Configuration.get("GraphQLAuthModuleEnabled", False):
self.field("editor", SortOrder)
self.field("created", SortOrder)
self.field("updated", SortOrder)