This commit is contained in:
Sven Heidemann 2024-12-13 23:27:18 +01:00
commit 565f21429a
336 changed files with 31609 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
.idea/
.vscode/
.code/
.angular/
dist/
node_modules/
venv/

4
api/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
src/files/
src/logs/
src/.env

0
api/README.md Normal file
View File

15
api/dockerfile Normal file
View File

@ -0,0 +1,15 @@
# syntax=docker/dockerfile:1
FROM python:3.12.7-alpine
WORKDIR /app
COPY ./src/ .
COPY ./requirements.txt .
RUN python -m pip install --upgrade pip
RUN python -m pip install -r requirements.txt
RUN apk update
RUN apk add bash
RUN apk add nano
CMD [ "bash", "/app/open-redirect"]

9
api/pytest.ini Normal file
View File

@ -0,0 +1,9 @@
[pytest]
minversion = 6.0
addopts = -ra -q
pythonpath = src
testpaths = tests
python_files = *.py
asyncio_mode=auto
asyncio_default_fixture_loop_scope="function"

5
api/requirements-dev.txt Normal file
View File

@ -0,0 +1,5 @@
black==24.10.0
pygount==1.8.0
pytest==8.3.4
pytest-asyncio==0.24.0
pytest-mock==3.14.0

10
api/requirements.txt Normal file
View File

@ -0,0 +1,10 @@
ariadne==0.23.0
eventlet==0.37.0
graphql-core==3.2.5
Flask[async]==3.1.0
Flask-Cors==5.0.0
async-property==0.2.2
python-keycloak==4.7.3
psycopg[binary]==3.2.3
psycopg-pool==3.2.4
Werkzeug==3.1.3

15
api/src/.env.template Normal file
View File

@ -0,0 +1,15 @@
ENVIRONMENT=
CLIENT_URLS=
DB_HOST=
DB_PORT=
DB_USER=
DB_PASSWORD=
DB_DATABASE=
KEYCLOAK_URL=
KEYCLOAK_CLIENT_ID=
KEYCLOAK_REALM=
KEYCLOAK_CLIENT_SECRET=
PLAYGROUND_USE_ADMIN_API_KEY= # be very careful with this one, it opens up the playground to all requests

0
api/src/api/__init__.py Normal file
View File

78
api/src/api/api.py Normal file
View File

@ -0,0 +1,78 @@
import importlib
import os
import time
from uuid import uuid4
from flask import Flask, request, g
from api.route import Route
from core.logger import APILogger
app = Flask(__name__)
logger = APILogger(__name__)
def filter_relevant_headers(headers: dict) -> dict:
relevant_keys = {
"Content-Type",
"Host",
"Connection",
"User-Agent",
"Origin",
"Referer",
"Accept",
}
return {key: value for key, value in headers.items() if key in relevant_keys}
@app.before_request
async def log_request():
g.request_id = uuid4()
g.start_time = time.time()
logger.debug(
f"Request {g.request_id}: {request.method}@{request.path} from {request.remote_addr}"
)
user = await Route.get_user()
request_info = {
"headers": filter_relevant_headers(dict(request.headers)),
"args": request.args.to_dict(),
"form-data": request.form.to_dict(),
"payload": request.get_json(silent=True),
"user": f"{user.id}-{user.keycloak_id}" if user else None,
"files": (
{key: file.filename for key, file in request.files.items()}
if request.files
else None
),
}
logger.trace(f"Request {g.request_id}: {request_info}")
@app.after_request
def log_after_request(response):
# calc the time it took to process the request
duration = (time.time() - g.start_time) * 1000
logger.info(
f"Request finished {g.request_id}: {response.status_code}-{request.method}@{request.path} from {request.remote_addr} in {duration:.2f}ms"
)
return response
@app.errorhandler(Exception)
def handle_exception(e):
logger.error(f"Request {g.request_id}", e)
return {"error": str(e)}, 500
# used to import all routes
routes_dir = os.path.join(os.path.dirname(__file__), "routes")
for filename in os.listdir(routes_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = f"api.routes.{filename[:-3]}"
importlib.import_module(module_name)
# Explicitly register the routes
for route, (view_func, options) in Route.registered_routes.items():
app.add_url_rule(route, view_func=view_func, **options)

View File

View File

@ -0,0 +1,26 @@
from typing import Optional
from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection
from core.environment import Environment
class Keycloak:
client: Optional[KeycloakOpenID] = None
admin: Optional[KeycloakAdmin] = None
@classmethod
def init(cls):
cls.client = KeycloakOpenID(
server_url=Environment.get("KEYCLOAK_URL", str),
client_id=Environment.get("KEYCLOAK_CLIENT_ID", str),
realm_name=Environment.get("KEYCLOAK_REALM", str),
client_secret_key=Environment.get("KEYCLOAK_CLIENT_SECRET", str),
)
connection = KeycloakOpenIDConnection(
server_url=Environment.get("KEYCLOAK_URL", str),
client_id=Environment.get("KEYCLOAK_CLIENT_ID", str),
realm_name=Environment.get("KEYCLOAK_REALM", str),
client_secret_key=Environment.get("KEYCLOAK_CLIENT_SECRET", str),
)
cls.admin = KeycloakAdmin(connection=connection)

View File

@ -0,0 +1,33 @@
from api.auth.keycloak_client import Keycloak
from core.get_value import get_value
class KeycloakUser:
def __init__(self, source: dict):
self._username = get_value(source, "preferred_username", str)
self._email = get_value(source, "email", str)
self._email_verified = get_value(source, "email_verified", bool)
self._name = get_value(source, "name", str)
@property
def username(self) -> str:
return self._username
@property
def email(self) -> str:
return self._email
@property
def email_verified(self) -> bool:
return self._email_verified
@property
def name(self) -> str:
return self._name
# Attrs from keycloak
@property
def id(self) -> str:
return Keycloak.admin.get_user_id(self._username)

9
api/src/api/errors.py Normal file
View File

@ -0,0 +1,9 @@
from flask import jsonify
def unauthorized():
return jsonify({"error": "Unauthorized"}), 401
def forbidden():
return jsonify({"error": "Unauthorized"}), 401

144
api/src/api/route.py Normal file
View File

@ -0,0 +1,144 @@
import functools
from functools import wraps
from inspect import iscoroutinefunction
from typing import Callable, Union, Optional
from urllib.request import Request
from flask import request
from flask_cors import cross_origin
from api.errors import unauthorized
from api.route_user_extension import RouteUserExtension
from core.environment import Environment
from data.schemas.administration.api_key import ApiKey
from data.schemas.administration.api_key_dao import apiKeyDao
from data.schemas.administration.user import User
class Route(RouteUserExtension):
registered_routes = {}
@classmethod
async def get_api_key(cls) -> ApiKey:
auth_header = request.headers.get("Authorization", None)
api_key = auth_header.split(" ")[1]
return await apiKeyDao.find_by_key(api_key)
@classmethod
async def _verify_api_key(cls, req: Request) -> bool:
auth_header = req.headers.get("Authorization", None)
if not auth_header or not auth_header.startswith("API-Key "):
return False
api_key = auth_header.split(" ")[1]
api_key_from_db = await apiKeyDao.find_by_key(api_key)
return api_key_from_db is not None and not api_key_from_db.deleted
@classmethod
async def _get_auth_type(cls, auth_header: str) -> Optional[Union[User, ApiKey]]:
if auth_header.startswith("Bearer "):
return await cls.get_user()
elif auth_header.startswith("API-Key "):
return await cls.get_api_key()
elif (
auth_header.startswith("DEV-User ")
and Environment.get_environment() == "development"
):
return await cls.get_dev_user()
return None
@classmethod
async def get_authenticated_user_or_api_key(cls) -> Union[User, ApiKey]:
auth_header = request.headers.get("Authorization", None)
if not auth_header:
raise Exception("No Authorization header found")
user_or_api_key = await cls._get_auth_type(auth_header)
if user_or_api_key is None:
raise Exception("Invalid Authorization header")
return user_or_api_key
@classmethod
async def get_authenticated_user_or_api_key_or_default(
cls,
) -> Optional[Union[User, ApiKey]]:
auth_header = request.headers.get("Authorization", None)
if not auth_header:
return None
return await cls._get_auth_type(auth_header)
@classmethod
async def is_authorized(cls) -> bool:
auth_header = request.headers.get("Authorization", None)
if not auth_header:
return False
if auth_header.startswith("Bearer "):
return await cls.verify_login(request)
elif auth_header.startswith("API-Key "):
return await cls._verify_api_key(request)
elif (
auth_header.startswith("DEV-User ")
and Environment.get_environment() == "development"
):
user = await cls.get_dev_user()
return user is not None
return False
@classmethod
def authorize(
cls,
f: Callable = None,
skip_in_dev=False,
by_api_key=False,
):
if f is None:
return functools.partial(
cls.authorize, skip_in_dev=skip_in_dev, by_api_key=by_api_key
)
@wraps(f)
async def decorator(*args, **kwargs):
if skip_in_dev and Environment.get_environment() == "development":
if iscoroutinefunction(f):
return await f(*args, **kwargs)
return f(*args, **kwargs)
if not await cls.is_authorized():
return unauthorized()
if iscoroutinefunction(f):
return await f(*args, **kwargs)
return f(*args, **kwargs)
return decorator
@classmethod
def route(cls, path=None, **kwargs):
def inner(fn):
cross_origin(fn)
cls.registered_routes[path] = (fn, kwargs)
return fn
return inner
@classmethod
def get(cls, path=None, **kwargs):
return cls.route(path, methods=["GET"], **kwargs)
@classmethod
def post(cls, path=None, **kwargs):
return cls.route(path, methods=["POST"], **kwargs)
@classmethod
def head(cls, path=None, **kwargs):
return cls.route(path, methods=["HEAD"], **kwargs)
@classmethod
def put(cls, path=None, **kwargs):
return cls.route(path, methods=["PUT"], **kwargs)
@classmethod
def delete(cls, path=None, **kwargs):
return cls.route(path, methods=["DELETE"], **kwargs)

View File

@ -0,0 +1,112 @@
from typing import Optional
from flask import request, Request, has_request_context
from keycloak import KeycloakAuthenticationError, KeycloakConnectionError
from api.auth.keycloak_client import Keycloak
from api.auth.keycloak_user import KeycloakUser
from core.get_value import get_value
from core.logger import Logger
from data.schemas.administration.user import User
from data.schemas.administration.user_dao import userDao
from data.schemas.permission.role_dao import roleDao
from data.schemas.permission.role_user import RoleUser
from data.schemas.permission.role_user_dao import roleUserDao
logger = Logger(__name__)
class RouteUserExtension:
@classmethod
def _get_user_id_from_token(cls) -> Optional[str]:
token = cls.get_token()
if not token:
return None
try:
user_info = Keycloak.client.userinfo(token)
except KeycloakAuthenticationError:
user_info = {}
except KeycloakConnectionError:
user_info = {}
return get_value(user_info, "sub", str)
@staticmethod
def get_token() -> Optional[str]:
if "Authorization" not in request.headers:
return None
if len(request.headers.get("Authorization").split()) < 2:
return None
return request.headers.get("Authorization").split()[1]
@classmethod
async def get_user(cls) -> Optional[User]:
if not has_request_context():
return None
user_id = cls._get_user_id_from_token()
if not user_id:
return None
return await userDao.find_by_keycloak_id(user_id)
@classmethod
async def get_dev_user(cls) -> Optional[User]:
return await userDao.find_single_by(
[{User.keycloak_id: cls.get_token()}, {User.deleted: False}]
)
@classmethod
async def get_user_or_default(cls) -> Optional[User]:
user = await cls.get_user()
if user is None:
user = await cls.get_dev_user()
return user
@classmethod
async def _create_user(cls, kc_user: KeycloakUser):
try:
user = await userDao.find_by_keycloak_id(kc_user.id)
if user is not None:
logger.warning(f"User {user.id}:{kc_user.id} already exists")
return
await userDao.create(User(0, kc_user.id))
users = await userDao.get_all()
if len(users) == 1:
user = await userDao.get_by_keycloak_id(kc_user.id)
admin_role = await roleDao.get_by_name("admin")
logger.warning(f"Assigning admin role to first user {user.id}")
await roleUserDao.create(RoleUser(0, admin_role.id, user.id))
except Exception as e:
logger.error("Failed to find or create user", e)
@classmethod
async def verify_login(cls, req: Request) -> bool:
auth_header = req.headers.get("Authorization", None)
if not auth_header or not auth_header.startswith("Bearer "):
return False
token = auth_header.split(" ")[1]
try:
# Verify token using Keycloak
user_info = Keycloak.client.userinfo(token)
if not user_info:
return False
user = await cls.get_user()
if user is None:
await cls._create_user(KeycloakUser(user_info))
return True
if user.deleted:
return False
except KeycloakAuthenticationError:
return False
return True

View File

View File

@ -0,0 +1,27 @@
from uuid import uuid4
from flask import send_file
from werkzeug.exceptions import NotFound
from api.route import Route
from core.logger import APILogger
logger = APILogger(__name__)
@Route.get(f"/api/files/<path:file_path>")
def get_file(file_path: str):
name = file_path
if "/" in file_path:
name = file_path.split("/")[-1]
try:
return send_file(
f"../files/{file_path}", download_name=name, as_attachment=True
)
except NotFound:
return {"error": "File not found"}, 404
except Exception as e:
error_id = uuid4()
logger.error(f"Error {error_id} getting file {file_path}", e)
return {"error": f"File error. ErrorId: {error_id}"}, 500

View File

@ -0,0 +1,27 @@
from ariadne import graphql
from flask import request, jsonify
from api.route import Route
from api_graphql.service.schema import schema
from core.logger import Logger
BasePath = f"/api/graphql"
logger = Logger(__name__)
@Route.post(f"{BasePath}")
async def graphql_endpoint():
data = request.get_json()
# Note: Passing the request to the context is optional.
# In Flask, the current request is always accessible as flask.request
success, result = await graphql(schema, data, context_value=request)
status_code = 200
if "errors" in result:
status_codes = [
error.get("extensions", {}).get("code", 200) for error in result["errors"]
]
status_code = max(status_codes, default=200)
return jsonify(result), status_code

View File

@ -0,0 +1,18 @@
from flask import redirect
from api.route import Route
from core.logger import Logger
from data.schemas.public.short_url import ShortUrl
from data.schemas.public.short_url_dao import shortUrlDao
BasePath = f"/"
logger = Logger(__name__)
@Route.get(f"{BasePath}/<path:path>")
async def handle_short_url(path: str):
from_db = await shortUrlDao.find_single_by({ShortUrl.short_url: path})
if from_db is None:
return {"error": "Short URL not found"}, 404
return redirect(from_db.target_url)

25
api/src/api/routes/ui.py Normal file
View File

@ -0,0 +1,25 @@
from ariadne.explorer import ExplorerPlayground
from api.route import Route
from core.environment import Environment
from core.logger import Logger
BasePath = f"/ui"
logger = Logger(__name__)
@Route.get(f"{BasePath}/playground")
@Route.authorize(skip_in_dev=True)
async def playground():
if Environment.get_environment() != "development":
return "", 403
request_global_headers = {}
dev_user = Environment.get("DEV_USER", str)
if dev_user:
request_global_headers = {f"Authorization": f"DEV-User {dev_user}"}
return (
ExplorerPlayground(request_global_headers=request_global_headers).html(None),
200,
)

View File

@ -0,0 +1,7 @@
from api.route import Route
from version import VERSION
@Route.get(f"/api/version")
def version():
return VERSION

View File

View File

View File

@ -0,0 +1,62 @@
from abc import ABC
from typing import Optional, Type
from core.typing import FilterOperator
class CollectionFilterABC[T](ABC):
def __init__(
self,
obj: Optional[dict],
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
ABC.__init__(self)
self._obj = obj
self._op: Optional[FilterOperator] = op
self._source_value: Optional[T] = source_value
self._resolvers: {} = {}
def add_field(self, field: str, filter_type: Type["CollectionFilterABC"]):
if field not in self._obj:
return
operator, value = next(iter(self._obj[field].items()))
self._resolvers[field] = filter_type({field: value}, operator, value)
def filter(self, x: T) -> bool:
collected = []
for field, resolver in self._resolvers.items():
collected.append(resolver.resolve(getattr(x, field)))
return all(collected)
def resolve(self, i_value: T) -> bool:
match self._op:
case "equal":
return self._source_value == i_value
case "notEqual":
return self._source_value != i_value
case "greater":
return self._source_value > i_value
case "greaterOrEqual":
return self._source_value >= i_value
case "less":
return self._source_value < i_value
case "lessOrEqual":
return self._source_value <= i_value
case "isNull":
return self._source_value is None
case "isNotNull":
return self._source_value is not None
case "contains":
return self._source_value in i_value
case "notContains":
return self._source_value not in i_value
case "startsWith":
return i_value.startswith(self._source_value)
case "endsWith":
return i_value.endswith(self._source_value)
case _:
raise Exception("Invalid operation")

View File

@ -0,0 +1,27 @@
from typing import Optional
from api_graphql.abc.collection_filter_abc import CollectionFilterABC, FilterOperator
from api_graphql.abc.filter.collection.bool_collection_filter import (
BoolCollectionFilter,
)
from api_graphql.abc.filter.collection.date_collection_filter import (
DateCollectionFilter,
)
from api_graphql.abc.filter.collection.int_collection_filter import IntCollectionFilter
class DbModelCollectionFilterABC[T](CollectionFilterABC):
def __init__(
self,
obj: Optional[dict],
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
CollectionFilterABC.__init__(self, obj, op, source_value)
self._obj = obj
self.add_field("id", IntCollectionFilter)
self.add_field("deleted", BoolCollectionFilter)
self.add_field("editor", IntCollectionFilter)
self.add_field("createdUtc", DateCollectionFilter)
self.add_field("updatedUtc", DateCollectionFilter)

View File

@ -0,0 +1,20 @@
from typing import Optional
from api_graphql.abc.filter.bool_filter import BoolFilter
from api_graphql.abc.filter.int_filter import IntFilter
from api_graphql.abc.filter.string_filter import StringFilter
from api_graphql.abc.filter_abc import FilterABC
class DbModelFilterABC[T](FilterABC[T]):
def __init__(
self,
obj: Optional[dict],
):
FilterABC.__init__(self, obj)
self.add_field("id", IntFilter)
self.add_field("deleted", BoolFilter)
self.add_field("editor", IntFilter)
self.add_field("createdUtc", StringFilter, "created")
self.add_field("updatedUtc", StringFilter, "updated")

View File

@ -0,0 +1,18 @@
from api_graphql.abc.query_abc import QueryABC
from data.schemas.administration.user import User
class DbModelQueryABC(QueryABC):
def __init__(self, name: str = __name__):
QueryABC.__init__(self, name)
self.set_field("id", lambda x, *_: x.id)
self.set_field("deleted", lambda x, *_: x.deleted)
self.set_field("editor", self.__get_editor)
self.set_field("createdUtc", lambda x, *_: x.created)
self.set_field("updatedUtc", lambda x, *_: x.updated)
@staticmethod
async def __get_editor(x: User, *_):
return await x.editor

View File

@ -0,0 +1,37 @@
from abc import ABC
from typing import Optional
from api_graphql.typing import TRequireAny
from service.permission.permissions_enum import Permissions
class FieldABC(ABC):
def __init__(
self,
name: str,
require_any_permission: list[Permissions] = None,
require_any: TRequireAny = None,
public: bool = False,
):
self._name = name
self._require_any_permission = require_any_permission
self._require_any = require_any
self._public = public
@property
def name(self) -> str:
return self._name
@property
def require_any_permission(self) -> Optional[list[Permissions]]:
return self._require_any_permission
@property
def require_any(self) -> Optional[TRequireAny]:
return self._require_any
@property
def public(self) -> bool:
return self._public

View File

@ -0,0 +1,42 @@
from abc import abstractmethod, ABC
from typing import Self
from api_graphql.abc.field_abc import FieldABC
from api_graphql.typing import TRequireAnyPermissions, TRequireAnyResolvers
from service.permission.permissions_enum import Permissions
class FieldBuilderABC(ABC):
def __init__(self, name: str):
ABC.__init__(self)
self._name = name
self._require_any_permission = None
self._require_any = None
self._public = False
def with_require_any_permission(
self, require_any_permission: list[Permissions]
) -> Self:
assert (
require_any_permission is not None
), "require_any_permission cannot be None"
self._require_any_permission = require_any_permission
return self
def with_require_any(
self, permissions: TRequireAnyPermissions, resolvers: TRequireAnyResolvers
) -> Self:
assert permissions is not None, "permissions cannot be None"
assert resolvers is not None, "resolvers cannot be None"
self._require_any = (permissions, resolvers)
return self
def with_public(self, public: bool = False) -> Self:
self._public = public
return self
@abstractmethod
def build(self) -> FieldABC:
pass

View File

@ -0,0 +1,16 @@
from typing import Optional
from api_graphql.abc.filter_abc import FilterABC
class BoolFilter(FilterABC):
def __init__(
self,
obj: Optional[dict], # {'equal': value}
):
FilterABC.__init__(self, obj)
self.add_field("equal", bool)
self.add_field("notEqual", bool)
self.add_field("isNull", bool)
self.add_field("isNotNull", bool)

View File

@ -0,0 +1,15 @@
from typing import Optional
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from core.typing import T, BoolFilterOperator
class BoolCollectionFilter(CollectionFilterABC):
def __init__(
self,
obj: Optional[dict],
op: Optional[BoolFilterOperator] = None,
source_value: Optional[T] = None,
):
CollectionFilterABC.__init__(self, obj, op, source_value)

View File

@ -0,0 +1,15 @@
from typing import Optional
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from core.typing import T, DateFilterOperator
class DateCollectionFilter(CollectionFilterABC):
def __init__(
self,
obj: Optional[dict],
op: Optional[DateFilterOperator] = None,
source_value: Optional[T] = None,
):
CollectionFilterABC.__init__(self, obj, op, source_value)

View File

@ -0,0 +1,15 @@
from typing import Optional
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from core.typing import T, IntFilterOperator
class IntCollectionFilter(CollectionFilterABC):
def __init__(
self,
obj: Optional[dict],
op: Optional[IntFilterOperator] = None,
source_value: Optional[T] = None,
):
CollectionFilterABC.__init__(self, obj, op, source_value)

View File

@ -0,0 +1,15 @@
from typing import Optional
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from core.typing import T, StringFilterOperator
class StringCollectionFilter(CollectionFilterABC):
def __init__(
self,
obj: Optional[dict],
op: Optional[StringFilterOperator] = None,
source_value: Optional[T] = None,
):
CollectionFilterABC.__init__(self, obj, op, source_value)

View File

@ -0,0 +1,20 @@
from typing import Optional
from api_graphql.abc.filter_abc import FilterABC
class DateFilter(FilterABC):
def __init__(
self,
obj: Optional[dict], # {'equal': value}
):
FilterABC.__init__(self, obj)
self.add_field("equal", str)
self.add_field("notEqual", str)
self.add_field("greater", str)
self.add_field("greaterOrEqual", str)
self.add_field("less", str)
self.add_field("lessOrEqual", str)
self.add_field("isNull", str)
self.add_field("isNotNull", str)

View File

@ -0,0 +1,20 @@
from typing import Optional
from api_graphql.abc.filter_abc import FilterABC
class IntFilter(FilterABC):
def __init__(
self,
obj: Optional[dict], # {'equal': value}
):
FilterABC.__init__(self, obj)
self.add_field("equal", int)
self.add_field("notEqual", int)
self.add_field("greater", int)
self.add_field("greaterOrEqual", int)
self.add_field("less", int)
self.add_field("lessOrEqual", int)
self.add_field("isNull", int)
self.add_field("isNotNull", int)

View File

@ -0,0 +1,20 @@
from typing import Optional
from api_graphql.abc.filter_abc import FilterABC
class StringFilter(FilterABC):
def __init__(
self,
obj: Optional[dict], # {'equal': value}
):
FilterABC.__init__(self, obj)
self.add_field("equal", str)
self.add_field("notEqual", str)
self.add_field("contains", str)
self.add_field("notContains", str)
self.add_field("startsWith", str)
self.add_field("endsWith", str)
self.add_field("isNull", str)
self.add_field("isNotNull", str)

View File

@ -0,0 +1,39 @@
from abc import ABC
from datetime import datetime
from typing import Optional, Union, Type
from core.typing import AttributeFilter
class FilterABC[T](ABC):
def __init__(
self,
obj: Optional[dict],
):
ABC.__init__(self)
self._obj = obj
self._values = {}
def add_field(
self,
field: str,
filter_type: Union[Type["FilterABC"], Type[Union[int, str, bool, datetime]]],
db_name=None,
):
if field not in self._obj:
return
if db_name is None:
db_name = field
if issubclass(filter_type, FilterABC):
f = filter_type(self._obj[field])
self._values[db_name] = f.to_attribute_filter()
return
self._values[db_name] = filter_type(self._obj[field])
def to_attribute_filter(self) -> AttributeFilter:
return self._values

View File

@ -0,0 +1,31 @@
from abc import ABC
from typing import Optional, Type, get_origin, get_args
from core.typing import T
class InputABC(ABC):
def __init__(
self,
src: Optional[dict],
):
ABC.__init__(self)
self._src = src
def option(
self, field: str, cast_type: Type[T], default=None, required=False
) -> Optional[T]:
if required and field not in self._src:
raise ValueError(f"{field} is required")
if field not in self._src:
return default
value = self._src[field]
if get_origin(cast_type) == list:
sub_type = get_args(cast_type)[0]
return [sub_type(item) for item in value]
return cast_type(value)
def get(self, field: str, default=None) -> Optional[T]:
return self._src.get(field, default)

View File

@ -0,0 +1,41 @@
from abc import abstractmethod
from api_graphql.abc.query_abc import QueryABC
from api_graphql.field.mutation_field_builder import MutationFieldBuilder
from service.permission.permissions_enum import Permissions
class MutationABC(QueryABC):
__abstract__ = True
@abstractmethod
def __init__(self, name: str = __name__):
QueryABC.__init__(self, f"{name}Mutation")
def add_mutation_type(
self,
name: str,
mutation_name: str,
require_any_permission: list[Permissions] = None,
public: bool = False,
):
"""
Add mutation type (sub mutation) to the mutation object
:param str name: GraphQL mutation name
:param str mutation_name: Internal (class) mutation name without "Mutation" suffix
:param list[Permissions] require_any_permission: List of permissions required to access the field
:param bool public: Define if the field can resolve without authentication
:return:
"""
from api_graphql.definition import QUERIES
self.field(
MutationFieldBuilder(name)
.with_resolver(
lambda *args, **kwargs: [
x for x in QUERIES if x.name == f"{mutation_name}Mutation"
][0]
)
.with_require_any_permission(require_any_permission)
.with_public(public)
)

View File

@ -0,0 +1,303 @@
from abc import abstractmethod
from asyncio import iscoroutinefunction
from enum import Enum
from types import NoneType
from typing import Callable, Type, get_args, Any, Union
from ariadne import ObjectType
from graphql import GraphQLResolveInfo
from typing_extensions import deprecated
from api.route import Route
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from api_graphql.abc.input_abc import InputABC
from api_graphql.abc.sort_abc import Sort
from api_graphql.field.collection_field import CollectionField
from api_graphql.field.collection_field_builder import CollectionFieldBuilder
from api_graphql.field.dao_field import DaoField
from api_graphql.field.dao_field_builder import DaoFieldBuilder
from api_graphql.field.mutation_field import MutationField
from api_graphql.field.mutation_field_builder import MutationFieldBuilder
from api_graphql.field.resolver_field import ResolverField
from api_graphql.field.resolver_field_builder import ResolverFieldBuilder
from api_graphql.service.collection_result import CollectionResult
from api_graphql.service.exceptions import (
UnauthorizedException,
PermissionDeniedException,
AccessDenied,
)
from api_graphql.service.query_context import QueryContext
from api_graphql.typing import TRequireAnyPermissions, TRequireAnyResolvers
from core.logger import APILogger
from service.permission.permissions_enum import Permissions
logger = APILogger(__name__)
class QueryABC(ObjectType):
__abstract__ = True
@abstractmethod
def __init__(self, name: str = __name__):
ObjectType.__init__(self, name)
@staticmethod
async def _authorize():
if not await Route.is_authorized():
raise UnauthorizedException()
@staticmethod
async def _require_any_permission(permissions: list[Permissions]):
if permissions is None or len(permissions) == 0:
return
authenticated = await Route.get_authenticated_user_or_api_key()
if all([await authenticated.has_permission(x) for x in permissions]):
return
raise PermissionDeniedException([x.value for x in permissions])
@classmethod
async def _require_any(
cls,
data: Any,
permissions: TRequireAnyPermissions,
resolvers: TRequireAnyResolvers,
*args,
**kwargs,
):
if len(permissions) > 0:
user = await Route.get_authenticated_user_or_api_key_or_default()
if user is not None and all(
[await user.has_permission(x) for x in permissions]
):
return
for x in resolvers:
user = await Route.get_authenticated_user_or_api_key_or_default()
user_permissions = []
if user is not None:
user_permissions = await user.permissions
if iscoroutinefunction(x):
result = await x(
QueryContext(data, user, user_permissions, *args, **kwargs)
)
else:
result = x(QueryContext(data, user, user_permissions, *args, **kwargs))
if not result:
raise AccessDenied()
def field(
self,
builder: Union[
DaoFieldBuilder,
CollectionFieldBuilder,
ResolverFieldBuilder,
MutationFieldBuilder,
],
):
"""
Add a field to the query
:param FieldBuilder builder: Field builder
:return:
"""
field = builder.build()
resolver = lambda *args, **kwargs: []
async def dao_wrapper(*args, **kwargs):
filters = []
sorts = []
take = None
skip = None
if field.filter_type and "filter" in kwargs:
in_filters = kwargs["filter"]
if not isinstance(in_filters, list):
in_filters = [in_filters]
for f in in_filters:
filters.append(field.filter_type(f).to_attribute_filter())
if field.sort_type and "sort" in kwargs:
sorts = kwargs["sort"]
if "take" in kwargs:
take = kwargs["take"]
if "skip" in kwargs:
skip = kwargs["skip"]
collection = await field.dao.find_by(filters, sorts, take, skip)
res = CollectionResult(await field.dao.count(), len(collection), collection)
return res
async def collection_wrapper(*args, **kwargs):
if "filter" in kwargs:
kwargs["filters"] = kwargs["filter"]
del kwargs["filter"]
if field.filter_type and "filters" in kwargs:
filters = []
for f in kwargs["filters"]:
filters.append(field.filter_type(f))
kwargs["filters"] = filters
else:
kwargs["filters"] = None
if field.sort_type and "sort" in kwargs:
sorts = []
for s in kwargs["sort"]:
sorts.append(field.sort_type(get_args(field.sort_type)[0], s))
kwargs["sort"] = sorts
else:
kwargs["sort"] = None
if iscoroutinefunction(field.resolver):
collection = await field.collection_resolver(*args)
else:
collection = field.collection_resolver(*args)
return self._resolve_collection(
collection,
*args,
**kwargs,
)
async def resolver_wrapper(*args, **kwargs):
return (
await field.resolver(*args, **kwargs)
if iscoroutinefunction(field.resolver)
else field.resolver(*args, **kwargs)
)
if isinstance(field, DaoField):
resolver = dao_wrapper
elif isinstance(field, CollectionField):
resolver = collection_wrapper
elif isinstance(field, ResolverField):
resolver = resolver_wrapper
elif isinstance(field, MutationField):
async def input_wrapper(
mutation: QueryABC, info: GraphQLResolveInfo, **kwargs
):
if field.input_type is None:
return await resolver_wrapper(mutation, info, **kwargs)
logger.debug(
f"{field.name}: {field.input_type.__name__} {kwargs[field.input_key]}"
)
input_obj = field.input_type(kwargs[field.input_key])
del kwargs[field.input_key]
return await resolver_wrapper(input_obj, mutation, info, **kwargs)
resolver = input_wrapper
else:
raise ValueError(f"Unknown field type: {field.name}")
async def wrapper(*args, **kwargs):
if not field.public:
await self._authorize()
if (
field.require_any is None
and not field.public
and field.require_any_permission
):
await self._require_any_permission(field.require_any_permission)
result = await resolver(*args, **kwargs)
if field.require_any is not None:
await self._require_any(result, *field.require_any, *args, **kwargs)
return result
self.set_field(field.name, wrapper)
@deprecated("Use field(FieldBuilder()) instead")
def mutation(
self,
name: str,
f: Callable,
input_type: Type[InputABC] = None,
input_key: str = "input",
require_any_permission: list[Permissions] = None,
public: bool = False,
):
"""
Adds a mutation to the query
:param str name: Name of the graphql field
:param Callable f: Mutation function
:param Type[InputABC] input_type: Type of the input
:param str input_key: Key of the input in the arguments
:param list[Permissions] require_any_permission: List of permissions required to access the field
:param bool public: Define if the field can resolve without authentication
:return:
"""
self.field(
MutationFieldBuilder(name)
.with_resolver(f)
.with_input(input_type, input_key)
.with_require_any_permission(require_any_permission)
.with_public(public)
)
@classmethod
def _resolve_collection(
cls,
collection: list,
*_,
filters: list[CollectionFilterABC] = None,
sort: list[Sort] = None,
skip: int = None,
take: int = None,
) -> CollectionResult:
total_count = len(collection)
if filters is not None and len(filters) > 0:
for f in filters:
collection = list(filter(lambda x: f.filter(x), collection))
if sort is not None:
def f_sort(x: object, k: str):
attr = getattr(x, k)
if isinstance(attr, Enum):
return attr.value
if isinstance(attr, NoneType):
return 0
return attr
for s in reversed(
sort
): # Apply sorting in reverse order to make first primary "orderBy" and other secondary "thenBy"
attrs = [a for a in dir(s) if not a.startswith("_")]
for k in attrs:
collection = sorted(
collection,
key=lambda x: f_sort(x, k),
reverse=getattr(s, k) == "DESC",
)
if skip is not None:
collection = collection[skip:]
if take is not None:
collection = collection[:take]
return CollectionResult(total_count, len(collection), collection)

View File

@ -0,0 +1,16 @@
import functools
from typing import Generic
from core.typing import T
class Sort(Generic[T]):
def __init__(self, _t: type, sort: dict):
for key in sort:
self.__setattr__(key, sort[key])
def _rgetattr(self, attr, *args):
def _getattr(self, attr):
return getattr(self, attr, *args)
return functools.reduce(_getattr, [self] + attr.split("."))

View File

@ -0,0 +1,28 @@
import importlib
import os
from api_graphql.abc.db_model_query_abc import DbModelQueryABC
from api_graphql.abc.mutation_abc import MutationABC
from api_graphql.abc.query_abc import QueryABC
from api_graphql.query import Query
# used to import all queries and mutations
def import_graphql_schema_part(part: str):
routes_dir = os.path.join(os.path.dirname(__file__), part)
for filename in os.listdir(routes_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = f"api_graphql.{part}.{filename[:-3]}"
importlib.import_module(module_name)
import_graphql_schema_part("queries")
import_graphql_schema_part("mutations")
sub_query_classes = [DbModelQueryABC, MutationABC]
query_classes = [
*[y for x in sub_query_classes for y in x.__subclasses__()],
*[x for x in QueryABC.__subclasses__() if x not in sub_query_classes],
]
QUERIES = [x() for x in query_classes if x != Query]

View File

View File

@ -0,0 +1,42 @@
from typing import Type, Optional, Callable
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from api_graphql.abc.field_abc import FieldABC
from api_graphql.typing import TRequireAny
from core.typing import T
from service.permission.permissions_enum import Permissions
class CollectionField(FieldABC):
def __init__(
self,
name: str,
require_any_permission: list[Permissions] = None,
require_any: TRequireAny = None,
public: bool = False,
collection_resolver: Callable = None,
filter_type: Type[CollectionFilterABC] = None,
sort_type: Type[T] = None,
):
FieldABC.__init__(self, name, require_any_permission, require_any, public)
self._name = name
self._public = public
self._collection_resolver = collection_resolver
self._filter_type = filter_type
self._sort_type = sort_type
@property
def collection_resolver(self) -> Optional[Callable]:
return self._collection_resolver
@property
def filter_type(
self,
) -> Type[CollectionFilterABC]:
return self._filter_type
@property
def sort_type(self) -> Optional[Type[T]]:
return self._sort_type

View File

@ -0,0 +1,46 @@
from typing import Type, Self, Callable
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from api_graphql.abc.field_builder_abc import FieldBuilderABC
from api_graphql.field.collection_field import CollectionField
from core.typing import T
class CollectionFieldBuilder(FieldBuilderABC):
def __init__(self, name: str):
FieldBuilderABC.__init__(self, name)
self._collection_resolver = None
self._filter_type = None
self._sort_type = None
def with_collection_resolver(self, collection_resolver: Callable) -> Self:
assert collection_resolver is not None, "collection_resolver cannot be None"
self._collection_resolver = collection_resolver
return self
def with_filter(self, filter_type: Type[CollectionFilterABC]) -> Self:
assert filter_type is not None, "filter cannot be None"
self._filter_type = filter_type
return self
def with_sort(self, sort_type: Type[T]) -> Self:
assert sort_type is not None, "sort cannot be None"
self._sort_type = sort_type
return self
def build(self) -> CollectionField:
assert (
self._collection_resolver is not None
), "collection_resolver cannot be None"
return CollectionField(
self._name,
self._require_any_permission,
self._require_any,
self._public,
self._collection_resolver,
self._filter_type,
self._sort_type,
)

View File

@ -0,0 +1,44 @@
from typing import Union, Type, Optional
from api_graphql.abc.collection_filter_abc import CollectionFilterABC
from api_graphql.abc.field_abc import FieldABC
from api_graphql.abc.filter_abc import FilterABC
from api_graphql.typing import TRequireAny
from core.database.abc.data_access_object_abc import DataAccessObjectABC
from core.typing import T
from service.permission.permissions_enum import Permissions
class DaoField(FieldABC):
def __init__(
self,
name: str,
require_any_permission: list[Permissions] = None,
require_any: TRequireAny = None,
public: bool = False,
dao: DataAccessObjectABC = None,
filter_type: Type[FilterABC] = None,
sort_type: Type[T] = None,
):
FieldABC.__init__(self, name, require_any_permission, require_any, public)
self._name = name
self._public = public
self._dao = dao
self._filter_type = filter_type
self._sort_type = sort_type
@property
def dao(self) -> Optional[DataAccessObjectABC]:
return self._dao
@property
def filter_type(
self,
) -> Optional[Type[FilterABC]]:
return self._filter_type
@property
def sort_type(self) -> Optional[Type[T]]:
return self._sort_type

View File

@ -0,0 +1,44 @@
from typing import Type, Self
from api_graphql.abc.field_builder_abc import FieldBuilderABC
from api_graphql.abc.filter_abc import FilterABC
from api_graphql.field.dao_field import DaoField
from core.database.abc.data_access_object_abc import DataAccessObjectABC
from core.typing import T
class DaoFieldBuilder(FieldBuilderABC):
def __init__(self, name: str):
FieldBuilderABC.__init__(self, name)
self._dao = None
self._filter_type = None
self._sort_type = None
def with_dao(self, dao: DataAccessObjectABC) -> Self:
assert dao is not None, "dao cannot be None"
self._dao = dao
return self
def with_filter(self, filter_type: Type[FilterABC]) -> Self:
assert filter_type is not None, "filter cannot be None"
self._filter_type = filter_type
return self
def with_sort(self, sort_type: Type[T]) -> Self:
assert sort_type is not None, "sort cannot be None"
self._sort_type = sort_type
return self
def build(self) -> DaoField:
assert self._dao is not None, "dao cannot be None"
return DaoField(
self._name,
self._require_any_permission,
self._require_any,
self._public,
self._dao,
self._filter_type,
self._sort_type,
)

View File

@ -0,0 +1,39 @@
from typing import Optional, Type
from ariadne.types import Resolver
from api_graphql.abc.field_abc import FieldABC
from api_graphql.abc.input_abc import InputABC
from api_graphql.typing import TRequireAny
from service.permission.permissions_enum import Permissions
class MutationField(FieldABC):
def __init__(
self,
name: str,
require_any_permission: list[Permissions] = None,
require_any: TRequireAny = None,
public: bool = False,
resolver: Resolver = None,
input_type: Type[InputABC] = None,
input_key: str = "input",
):
FieldABC.__init__(self, name, require_any_permission, require_any, public)
self._resolver = resolver
self._input_type = input_type
self._input_key = input_key
@property
def resolver(self) -> Optional[Resolver]:
return self._resolver
@property
def input_type(self) -> Optional[Type[InputABC]]:
return self._input_type
@property
def input_key(self) -> str:
return self._input_key

View File

@ -0,0 +1,39 @@
from typing import Self, Type
from ariadne.types import Resolver
from api_graphql.abc.field_builder_abc import FieldBuilderABC
from api_graphql.abc.input_abc import InputABC
from api_graphql.field.mutation_field import MutationField
class MutationFieldBuilder(FieldBuilderABC):
def __init__(self, name: str):
FieldBuilderABC.__init__(self, name)
self._resolver = None
self._input_type = None
self._input_key = None
def with_resolver(self, resolver: Resolver) -> Self:
assert resolver is not None, "resolver cannot be None"
self._resolver = resolver
return self
def with_input(self, input_type: Type[InputABC], input_key: str = None) -> Self:
self._input_type = input_type
self._input_key = input_key
return self
def build(self) -> MutationField:
assert self._resolver is not None, "resolver cannot be None"
return MutationField(
self._name,
self._require_any_permission,
self._require_any,
self._public,
self._resolver,
self._input_type,
self._input_key,
)

View File

@ -0,0 +1,26 @@
from typing import Optional
from ariadne.types import Resolver
from api_graphql.abc.field_abc import FieldABC
from api_graphql.typing import TRequireAny
from service.permission.permissions_enum import Permissions
class ResolverField(FieldABC):
def __init__(
self,
name: str,
require_any_permission: list[Permissions] = None,
require_any: TRequireAny = None,
public: bool = False,
resolver: Resolver = None,
):
FieldABC.__init__(self, name, require_any_permission, require_any, public)
self._resolver = resolver
@property
def resolver(self) -> Optional[Resolver]:
return self._resolver

View File

@ -0,0 +1,29 @@
from typing import Self
from ariadne.types import Resolver
from api_graphql.abc.field_builder_abc import FieldBuilderABC
from api_graphql.field.resolver_field import ResolverField
class ResolverFieldBuilder(FieldBuilderABC):
def __init__(self, name: str):
FieldBuilderABC.__init__(self, name)
self._resolver = None
def with_resolver(self, resolver: Resolver) -> Self:
assert resolver is not None, "resolver cannot be None"
self._resolver = resolver
return self
def build(self) -> ResolverField:
assert self._resolver is not None, "resolver cannot be None"
return ResolverField(
self._name,
self._require_any_permission,
self._require_any,
self._public,
self._resolver,
)

View File

View File

@ -0,0 +1,12 @@
from api_graphql.abc.db_model_filter_abc import DbModelFilterABC
from api_graphql.abc.filter.string_filter import StringFilter
class ApiKeyFilter(DbModelFilterABC):
def __init__(
self,
obj: dict,
):
DbModelFilterABC.__init__(self, obj)
self.add_field("identifier", StringFilter)

View File

@ -0,0 +1,18 @@
from typing import Optional
from api_graphql.abc.db_model_collection_filter_abc import DbModelCollectionFilterABC
from api_graphql.abc.filter.string_filter import StringCollectionFilter
from api_graphql.abc.collection_filter_abc import FilterOperator
from core.typing import T
class ApiKeyCollectionFilter(DbModelCollectionFilterABC):
def __init__(
self,
obj: dict,
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
DbModelCollectionFilterABC.__init__(self, obj, op, source_value)
self.add_field("identifier", StringCollectionFilter)

View File

@ -0,0 +1,21 @@
from typing import Optional
from api_graphql.abc.db_model_collection_filter_abc import DbModelCollectionFilterABC
from api_graphql.abc.filter.string_filter import StringCollectionFilter
from api_graphql.abc.collection_filter_abc import FilterOperator
from core.typing import T
class IpListFilter(DbModelCollectionFilterABC):
def __init__(
self,
obj: dict,
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
DbModelCollectionFilterABC.__init__(self, obj, op, source_value)
self.add_field("ip", StringCollectionFilter)
self.add_field("description", StringCollectionFilter)
self.add_field("mac", StringCollectionFilter)
self.add_field("dns", StringCollectionFilter)

View File

@ -0,0 +1,20 @@
from typing import Optional
from api_graphql.abc.db_model_collection_filter_abc import DbModelCollectionFilterABC
from api_graphql.abc.filter.bool_filter import BoolCollectionFilter
from api_graphql.abc.filter.string_filter import StringCollectionFilter
from api_graphql.abc.collection_filter_abc import FilterOperator
from core.typing import T
class NewsFilter(DbModelCollectionFilterABC):
def __init__(
self,
obj: dict,
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
DbModelCollectionFilterABC.__init__(self, obj, op, source_value)
self.add_field("title", StringCollectionFilter)
self.add_field("published", BoolCollectionFilter)

View File

@ -0,0 +1,19 @@
from typing import Optional
from api_graphql.abc.db_model_collection_filter_abc import DbModelCollectionFilterABC
from api_graphql.abc.filter.string_filter import StringCollectionFilter
from api_graphql.abc.collection_filter_abc import FilterOperator
from core.typing import T
class PermissionFilter(DbModelCollectionFilterABC):
def __init__(
self,
obj: dict,
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
DbModelCollectionFilterABC.__init__(self, obj, op, source_value)
self.add_field("name", StringCollectionFilter)
self.add_field("description", StringCollectionFilter)

View File

@ -0,0 +1,19 @@
from typing import Optional
from api_graphql.abc.db_model_collection_filter_abc import DbModelCollectionFilterABC
from api_graphql.abc.filter.string_filter import StringCollectionFilter
from api_graphql.abc.collection_filter_abc import FilterOperator
from core.typing import T
class RoleFilter(DbModelCollectionFilterABC):
def __init__(
self,
obj: dict,
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
DbModelCollectionFilterABC.__init__(self, obj, op, source_value)
self.add_field("name", StringCollectionFilter)
self.add_field("description", StringCollectionFilter)

View File

@ -0,0 +1,20 @@
from typing import Optional
from api_graphql.abc.db_model_collection_filter_abc import DbModelCollectionFilterABC
from api_graphql.abc.filter.string_filter import StringCollectionFilter
from api_graphql.abc.collection_filter_abc import FilterOperator
from core.typing import T
class UserFilter(DbModelCollectionFilterABC):
def __init__(
self,
obj: dict,
op: Optional[FilterOperator] = None,
source_value: Optional[T] = None,
):
DbModelCollectionFilterABC.__init__(self, obj, op, source_value)
self.add_field("keycloakId", StringCollectionFilter)
self.add_field("username", StringCollectionFilter)
self.add_field("email", StringCollectionFilter)

View File

@ -0,0 +1,13 @@
from api_graphql.abc.db_model_filter_abc import DbModelFilterABC
from api_graphql.abc.filter.string_filter import StringFilter
class GroupFilter(DbModelFilterABC):
def __init__(
self,
obj: dict,
):
DbModelFilterABC.__init__(self, obj)
self.add_field("name", StringFilter)
self.add_field("description", StringFilter)

View File

@ -0,0 +1,13 @@
from api_graphql.abc.db_model_filter_abc import DbModelFilterABC
from api_graphql.abc.filter.string_filter import StringFilter
class PermissionFilter(DbModelFilterABC):
def __init__(
self,
obj: dict,
):
DbModelFilterABC.__init__(self, obj)
self.add_field("name", StringFilter)
self.add_field("description", StringFilter)

View File

@ -0,0 +1,13 @@
from api_graphql.abc.db_model_filter_abc import DbModelFilterABC
from api_graphql.abc.filter.string_filter import StringFilter
class RoleFilter(DbModelFilterABC):
def __init__(
self,
obj: dict,
):
DbModelFilterABC.__init__(self, obj)
self.add_field("name", StringFilter)
self.add_field("description", StringFilter)

View File

@ -0,0 +1,14 @@
from api_graphql.abc.db_model_filter_abc import DbModelFilterABC
from api_graphql.abc.filter.string_filter import StringFilter
class ShortUrlFilter(DbModelFilterABC):
def __init__(
self,
obj: dict,
):
DbModelFilterABC.__init__(self, obj)
self.add_field("short_url", StringFilter)
self.add_field("target_url", StringFilter)
self.add_field("description", StringFilter)

View File

@ -0,0 +1,14 @@
from api_graphql.abc.db_model_filter_abc import DbModelFilterABC
from api_graphql.abc.filter.string_filter import StringFilter
class UserFilter(DbModelFilterABC):
def __init__(
self,
obj: dict,
):
DbModelFilterABC.__init__(self, obj)
self.add_field("keycloakId", StringFilter)
self.add_field("username", StringFilter)
self.add_field("email", StringFilter)

View File

@ -0,0 +1,55 @@
type ApiKeyResult {
totalCount: Int
count: Int
nodes: [ApiKey]
}
type ApiKey implements DbModel {
id: ID
identifier: String
key: String
permissions: [Permission]
deleted: Boolean
editor: User
createdUtc: String
updatedUtc: String
}
input ApiKeySort {
id: SortOrder
identifier: SortOrder
deleted: SortOrder
editorId: SortOrder
createdUtc: SortOrder
updatedUtc: SortOrder
}
input ApiKeyFilter {
id: IntFilter
identifier: StringFilter
deleted: BooleanFilter
editorId: IntFilter
createdUtc: DateFilter
updatedUtc: DateFilter
}
type ApiKeyMutation {
create(input: ApiKeyCreateInput!): ApiKey
update(input: ApiKeyUpdateInput!): ApiKey
delete(identifier: String!): Boolean
restore(identifier: String!): Boolean
}
input ApiKeyCreateInput {
identifier: String
permissions: [ID]
}
input ApiKeyUpdateInput {
id: ID!
identifier: String
permissions: [ID]
}

View File

@ -0,0 +1,66 @@
scalar Upload
interface DbModel {
id: ID
deleted: Boolean
editor: User
createdUtc: String
updatedUtc: String
}
enum SortOrder {
ASC
DESC
}
interface DbModelSort {
deleted: SortOrder
editor: SortOrder
created: SortOrder
updated: SortOrder
}
input StringFilter {
equal: String
notEqual: String
contains: String
notContains: String
startsWith: String
endsWith: String
isNull: String
isNotNull: String
}
input IntFilter {
equal: Int
notEqual: Int
greater: Int
greaterOrEqual: Int
less: Int
lessOrEqual: Int
isNull: Int
isNotNull: Int
}
input BooleanFilter {
equal: Boolean
notEqual: Int
isNull: Int
isNotNull: Int
}
input DateFilter {
equal: String
notEqual: String
contains: String
notContains: String
isNull: String
isNotNull: String
}

View File

@ -0,0 +1,44 @@
type GroupResult {
totalCount: Int
count: Int
nodes: [Group]
}
type Group implements DbModel {
id: ID
name: String
description: String
deleted: Boolean
editor: User
createdUtc: String
updatedUtc: String
}
input GroupSort {
id: SortOrder
name: SortOrder
description: SortOrder
deleted: SortOrder
editorId: SortOrder
createdUtc: SortOrder
updatedUtc: SortOrder
}
input GroupFilter {
id: IntFilter
name: StringFilter
description: StringFilter
deleted: BooleanFilter
editor: IntFilter
createdUtc: DateFilter
updatedUtc: DateFilter
}
input GroupInput {
id: ID
name: String
description: String
}

View File

@ -0,0 +1,6 @@
type Mutation {
apiKey: ApiKeyMutation
user: UserMutation
role: RoleMutation
}

View File

@ -0,0 +1,44 @@
type PermissionResult {
totalCount: Int
count: Int
nodes: [Permission]
}
type Permission implements DbModel {
id: ID
name: String
description: String
deleted: Boolean
editor: User
createdUtc: String
updatedUtc: String
}
input PermissionSort {
id: SortOrder
name: SortOrder
description: SortOrder
deleted: SortOrder
editorId: SortOrder
createdUtc: SortOrder
updatedUtc: SortOrder
}
input PermissionFilter {
id: IntFilter
name: StringFilter
description: StringFilter
deleted: BooleanFilter
editor: IntFilter
createdUtc: DateFilter
updatedUtc: DateFilter
}
input PermissionInput {
id: ID
name: String
description: String
}

View File

@ -0,0 +1,16 @@
type Query {
ping: String
apiKeys(filter: [ApiKeyFilter], sort: [ApiKeySort], skip: Int, take: Int): ApiKeyResult
permissions(filter: [PermissionFilter], sort: [PermissionSort], skip: Int, take: Int): PermissionResult
roles(filter: [RoleFilter], sort: [RoleSort], skip: Int, take: Int): RoleResult
users(filter: [UserFilter], sort: [UserSort], skip: Int, take: Int): UserResult
user: User
userHasPermission(permission: String!): Boolean
userHasAnyPermission(permissions: [String]!): Boolean
notExistingUsersFromKeycloak: KeycloakUserResult
groups(filter: [GroupFilter], sort: [GroupSort], skip: Int, take: Int): GroupResult
shortUrls(filter: [ShortUrlFilter], sort: [ShortUrlSort], skip: Int, take: Int): ShortUrlResult
}

View File

@ -0,0 +1,60 @@
type RoleResult {
totalCount: Int
count: Int
nodes: [Role]
}
type Role implements DbModel {
id: ID
name: String
description: String
permissions: [Permission]
users: [User]
deleted: Boolean
editor: User
createdUtc: String
updatedUtc: String
}
input RoleSort {
id: SortOrder
name: SortOrder
description: SortOrder
deleted: SortOrder
editorId: SortOrder
createdUtc: SortOrder
updatedUtc: SortOrder
}
input RoleFilter {
id: IntFilter
name: StringFilter
description: StringFilter
deleted: BooleanFilter
editorId: IntFilter
createdUtc: DateFilter
updatedUtc: DateFilter
}
type RoleMutation {
create(input: RoleCreateInput!): Role
update(input: RoleUpdateInput!): Role
delete(id: ID!): Boolean
restore(id: ID!): Boolean
}
input RoleCreateInput {
name: String!
description: String
permissions: [ID]
}
input RoleUpdateInput {
id: ID!
name: String
description: String
permissions: [ID]
}

View File

@ -0,0 +1,50 @@
type ShortUrlResult {
totalCount: Int
count: Int
nodes: [ShortUrl]
}
type ShortUrl implements DbModel {
id: ID
shortUrl: String
targetUrl: String
description: String
group: Group
deleted: Boolean
editor: User
createdUtc: String
updatedUtc: String
}
input ShortUrlSort {
id: SortOrder
name: SortOrder
description: SortOrder
group: GroupSort
deleted: SortOrder
editorId: SortOrder
createdUtc: SortOrder
updatedUtc: SortOrder
}
input ShortUrlFilter {
id: IntFilter
name: StringFilter
description: StringFilter
group: GroupFilter
deleted: BooleanFilter
editor: IntFilter
createdUtc: DateFilter
updatedUtc: DateFilter
}
input ShortUrlInput {
id: ID
shortUrl: String
targetUrl: String
description: String
group: ID
}

View File

@ -0,0 +1,70 @@
type KeycloakUserResult {
totalCount: Int
count: Int
nodes: [KeycloakUser]
}
type KeycloakUser {
keycloakId: String
username: String
}
type UserResult {
totalCount: Int
count: Int
nodes: [User]
}
type User implements DbModel {
id: ID
keycloakId: String
username: String
email: String
roles: [Role]
deleted: Boolean
editor: User
createdUtc: String
updatedUtc: String
}
input UserSort {
id: SortOrder
keycloakId: SortOrder
username: SortOrder
email: SortOrder
deleted: SortOrder
editorId: SortOrder
createdUtc: SortOrder
updatedUtc: SortOrder
}
input UserFilter {
id: IntFilter
keycloakId: StringFilter
username: StringFilter
email: StringFilter
deleted: BooleanFilter
editor: IntFilter
createdUtc: DateFilter
updatedUtc: DateFilter
}
type UserMutation {
create(input: UserCreateInput!): User
update(input: UserUpdateInput!): User
delete(id: ID!): Boolean
restore(id: ID!): Boolean
}
input UserCreateInput {
keycloakId: String
roles: [ID]
}
input UserUpdateInput {
id: ID
roles: [ID]
}

View File

View File

@ -0,0 +1,21 @@
from typing import Optional
from api_graphql.abc.input_abc import InputABC
from data.schemas.permission.permission import Permission
class ApiKeyCreateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._identifier = self.option("identifier", str, required=True)
self._permissions = self.option("permissions", list[int], default=[])
@property
def identifier(self) -> str:
return self._identifier
@property
def permissions(self) -> list[int]:
return self._permissions

View File

@ -0,0 +1,25 @@
from typing import Optional
from api_graphql.abc.input_abc import InputABC
class ApiKeyUpdateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._id = self.option("id", int, required=True)
self._identifier = self.option("identifier", str)
self._permissions = self.option("permissions", list[int])
@property
def id(self) -> int:
return self._id
@property
def identifier(self) -> Optional[str]:
return self._identifier
@property
def permissions(self) -> Optional[list[int]]:
return self._permissions

View File

@ -0,0 +1,18 @@
from api_graphql.abc.input_abc import InputABC
class AttachmentInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._filename = self.option("filename", str, required=True)
self._base64 = self.option("base64", str, required=True)
@property
def filename(self) -> str:
return self._filename
@property
def base64(self) -> str:
return self._base64

View File

@ -0,0 +1,31 @@
from api_graphql.abc.input_abc import InputABC
from api_graphql.input.attachment_input import AttachmentInput
class NewsCreateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._title = self.option("title", str, required=True)
self._content = self.option("content", str, required=True)
self._published = self.option("published", bool, default=False)
self._attachments = self.option(
"attachments", list[AttachmentInput], default=[]
)
@property
def title(self) -> str:
return self._title
@property
def content(self) -> str:
return self._content
@property
def published(self) -> bool:
return self._published
@property
def attachments(self) -> list[AttachmentInput]:
return self._attachments

View File

@ -0,0 +1,36 @@
from api_graphql.abc.input_abc import InputABC
from api_graphql.input.attachment_input import AttachmentInput
class NewsUpdateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._id = self.option("id", int, required=True)
self._title = self.option("title", str)
self._content = self.option("content", str)
self._published = self.option("published", bool, default=False)
self._attachments = self.option(
"attachments", list[AttachmentInput], default=[]
)
@property
def id(self) -> int:
return self._id
@property
def title(self) -> str:
return self._title
@property
def content(self) -> str:
return self._content
@property
def published(self) -> bool:
return self._published
@property
def attachments(self) -> list[AttachmentInput]:
return self._attachments

View File

@ -0,0 +1,26 @@
from typing import Optional
from api_graphql.abc.input_abc import InputABC
from data.schemas.permission.permission import Permission
class RoleCreateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._name = self.option("name", str, required=True)
self._description = self.option("description", str)
self._permissions = self.option("permissions", list[int], default=[])
@property
def name(self) -> str:
return self._name
@property
def description(self) -> Optional[str]:
return self._description
@property
def permissions(self) -> list[int]:
return self._permissions

View File

@ -0,0 +1,31 @@
from typing import Optional
from api_graphql.abc.input_abc import InputABC
from service.permission.permissions_enum import Permissions
class RoleUpdateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._id = self.option("id", int, required=True)
self._name = self.option("name", str)
self._description = self.option("description", str)
self._permissions = self.option("permissions", list[int])
@property
def id(self) -> int:
return self._id
@property
def name(self) -> Optional[str]:
return self._name
@property
def description(self) -> Optional[str]:
return self._description
@property
def permissions(self) -> Optional[list[int]]:
return self._permissions

View File

@ -0,0 +1,18 @@
from api_graphql.abc.input_abc import InputABC
class UserCreateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._keycloak_id = self.option("keycloakId", str, required=True)
self._roles = self.option("roles", list[int], default=[])
@property
def keycloak_id(self) -> str:
return self._keycloak_id
@property
def roles(self) -> list[int]:
return self._roles

View File

@ -0,0 +1,18 @@
from api_graphql.abc.input_abc import InputABC
class UserUpdateInput(InputABC):
def __init__(self, src: dict):
InputABC.__init__(self, src)
self._id = self.option("id", int, required=True)
self._roles = self.option("roles", list[int], default=[])
@property
def id(self) -> int:
return self._id
@property
def roles(self) -> list[int]:
return self._roles

View File

@ -0,0 +1,34 @@
from api_graphql.abc.mutation_abc import MutationABC
from service.permission.permissions_enum import Permissions
class Mutation(MutationABC):
def __init__(self):
MutationABC.__init__(self, "")
self.add_mutation_type(
"apiKey",
"ApiKey",
require_any_permission=[
Permissions.api_keys_create,
Permissions.api_keys_update,
Permissions.api_keys_delete,
],
)
self.add_mutation_type(
"role",
"Role",
require_any_permission=[
Permissions.roles_create,
Permissions.roles_update,
Permissions.roles_delete,
],
)
self.add_mutation_type(
"user",
"User",
require_any_permission=[
Permissions.users_update,
Permissions.users_delete,
],
)

View File

@ -0,0 +1,133 @@
from uuid import uuid4
from api_graphql.abc.mutation_abc import MutationABC
from api_graphql.input.api_key_create_input import ApiKeyCreateInput
from api_graphql.input.api_key_update_input import ApiKeyUpdateInput
from core.logger import APILogger
from data.schemas.administration.api_key import ApiKey
from data.schemas.administration.api_key_dao import apiKeyDao
from data.schemas.permission.api_key_permission import ApiKeyPermission
from data.schemas.permission.api_key_permission_dao import apiKeyPermissionDao
from service.permission.permissions_enum import Permissions
logger = APILogger(__name__)
class APIKeyMutation(MutationABC):
def __init__(self):
MutationABC.__init__(self, "ApiKey")
self.mutation(
"create",
self.resolve_create,
ApiKeyCreateInput,
require_any_permission=[Permissions.api_keys_create],
)
self.mutation(
"update",
self.resolve_update,
ApiKeyUpdateInput,
require_any_permission=[Permissions.api_keys_update],
)
self.mutation(
"delete",
self.resolve_delete,
require_any_permission=[Permissions.api_keys_delete],
)
self.mutation(
"restore",
self.resolve_restore,
require_any_permission=[Permissions.api_keys_delete],
)
@staticmethod
async def resolve_create(obj: ApiKeyCreateInput, *_):
logger.debug(f"create api key: {obj.__dict__}")
api_key = ApiKey(
0,
obj.identifier,
str(uuid4()),
)
await apiKeyDao.create(api_key)
api_key = await apiKeyDao.get_by_identifier(api_key.identifier)
await apiKeyPermissionDao.create_many(
[ApiKeyPermission(0, api_key.id, x) for x in obj.permissions]
)
return api_key
@staticmethod
async def resolve_update(obj: ApiKeyUpdateInput, *_):
logger.debug(f"update api key: {input}")
api_key = await apiKeyDao.get_by_id(obj.id)
if obj.permissions is not None:
permissions = [
x for x in await apiKeyPermissionDao.get_by_role_id(api_key.id)
]
to_delete = (
permissions
if len(obj.permissions) == 0
else await apiKeyPermissionDao.find_by(
[
{ApiKeyPermission.api_key_id: api_key.id},
{
ApiKeyPermission.permission_id: {
"notIn": obj.get("permissions", [])
}
},
]
)
)
permission_ids = [x.permission_id for x in permissions]
deleted_permission_ids = [
x.permission_id
for x in await apiKeyPermissionDao.find_by(
[
{ApiKeyPermission.api_key_id: api_key.id},
{ApiKeyPermission.deleted: True},
]
)
]
to_create = [
ApiKeyPermission(0, api_key.id, x)
for x in obj.permissions
if x not in permission_ids and x not in deleted_permission_ids
]
to_restore = [
await apiKeyPermissionDao.get_single_by(
[
{ApiKeyPermission.api_key_id: api_key.id},
{ApiKeyPermission.permission_id: x},
]
)
for x in obj.permissions
if x not in permission_ids and x in deleted_permission_ids
]
if len(to_delete) > 0:
await apiKeyPermissionDao.delete_many(to_delete)
if len(to_create) > 0:
await apiKeyPermissionDao.create_many(to_create)
if len(to_restore) > 0:
await apiKeyPermissionDao.restore_many(to_restore)
return api_key
@staticmethod
async def resolve_delete(*_, id: str):
logger.debug(f"delete api key: {id}")
api_key = await apiKeyDao.get_by_id(id)
await apiKeyDao.delete(api_key)
return True
@staticmethod
async def resolve_restore(*_, id: str):
logger.debug(f"restore api key: {id}")
api_key = await apiKeyDao.get_by_id(id)
await apiKeyDao.restore(api_key)
return True

View File

@ -0,0 +1,129 @@
from api_graphql.abc.mutation_abc import MutationABC
from api_graphql.input.role_create_input import RoleCreateInput
from api_graphql.input.role_update_input import RoleUpdateInput
from core.logger import APILogger
from data.schemas.permission.role import Role
from data.schemas.permission.role_dao import roleDao
from data.schemas.permission.role_permission import RolePermission
from data.schemas.permission.role_permission_dao import rolePermissionDao
from service.permission.permissions_enum import Permissions
logger = APILogger(__name__)
class RoleMutation(MutationABC):
def __init__(self):
MutationABC.__init__(self, "Role")
self.mutation(
"create",
self.resolve_create,
RoleCreateInput,
require_any_permission=[Permissions.roles_create],
)
self.mutation(
"update",
self.resolve_update,
RoleUpdateInput,
require_any_permission=[Permissions.roles_update],
)
self.mutation(
"delete",
self.resolve_delete,
require_any_permission=[Permissions.roles_delete],
)
self.mutation(
"restore",
self.resolve_restore,
require_any_permission=[Permissions.roles_delete],
)
@staticmethod
async def resolve_create(obj: RoleCreateInput, *_):
logger.debug(f"create role: {obj.__dict__}")
role = Role(
0,
obj.name,
obj.description,
)
await roleDao.create(role)
role = await roleDao.get_by_name(role.name)
await rolePermissionDao.create_many(
[RolePermission(0, role.id, x) for x in obj.permissions]
)
return role
@staticmethod
async def resolve_update(obj: RoleUpdateInput, *_):
logger.debug(f"update role: {obj.__dict__}")
role = await roleDao.get_by_id(obj.id)
role.name = obj.get("name", role.name)
role.description = obj.get("description", role.description)
await roleDao.update(role)
if obj.permissions is not None:
permissions = [x for x in await rolePermissionDao.get_by_role_id(role.id)]
to_delete = (
permissions
if len(obj.permissions) == 0
else await rolePermissionDao.find_by(
[
{RolePermission.role_id: role.id},
{
RolePermission.permission_id: {
"notIn": obj.get("permissions", [])
}
},
]
)
)
permission_ids = [x.permission_id for x in permissions]
deleted_permission_ids = [
x.permission_id
for x in await rolePermissionDao.find_by(
[{RolePermission.role_id: role.id}, {RolePermission.deleted: True}]
)
]
to_create = [
RolePermission(0, role.id, x)
for x in obj.permissions
if x not in permission_ids and x not in deleted_permission_ids
]
to_restore = [
await rolePermissionDao.get_single_by(
[
{RolePermission.role_id: role.id},
{RolePermission.permission_id: x},
]
)
for x in obj.permissions
if x not in permission_ids and x in deleted_permission_ids
]
if len(to_delete) > 0:
await rolePermissionDao.delete_many(to_delete)
if len(to_create) > 0:
await rolePermissionDao.create_many(to_create)
if len(to_restore) > 0:
await rolePermissionDao.restore_many(to_restore)
return role
@staticmethod
async def resolve_delete(*_, id: int):
logger.debug(f"delete role: {id}")
role = await roleDao.get_by_id(id)
await roleDao.delete(role)
return True
@staticmethod
async def resolve_restore(*_, id: int):
logger.debug(f"restore role: {id}")
role = await roleDao.get_by_id(id)
await roleDao.restore(role)
return True

View File

@ -0,0 +1,123 @@
from api.auth.keycloak_client import Keycloak
from api_graphql.abc.mutation_abc import MutationABC
from api_graphql.input.user_create_input import UserCreateInput
from api_graphql.input.user_update_input import UserUpdateInput
from core.logger import APILogger
from data.schemas.administration.user import User
from data.schemas.administration.user_dao import userDao
from data.schemas.permission.role_user import RoleUser
from data.schemas.permission.role_user_dao import roleUserDao
from service.permission.permissions_enum import Permissions
logger = APILogger(__name__)
class UserMutation(MutationABC):
def __init__(self):
MutationABC.__init__(self, "User")
self.mutation(
"create",
self.resolve_create,
UserCreateInput,
require_any_permission=[Permissions.users_create],
)
self.mutation(
"update",
self.resolve_update,
UserUpdateInput,
require_any_permission=[Permissions.users_update],
)
self.mutation(
"delete",
self.resolve_delete,
require_any_permission=[Permissions.users_delete],
)
self.mutation(
"restore",
self.resolve_restore,
require_any_permission=[Permissions.users_delete],
)
@staticmethod
async def resolve_create(obj: UserCreateInput, *_):
logger.debug(f"create user: {obj.__dict__}")
# ensure keycloak knows a user with this keycloak_id
# get_user should raise an exception if the user does not exist
kc_user = Keycloak.admin.get_user(obj.keycloak_id)
if kc_user is None:
raise ValueError(f"Keycloak user with id {obj.keycloak_id} does not exist")
user = User(0, obj.keycloak_id)
await userDao.create(user)
user = await userDao.get_by_keycloak_id(user.keycloak_id)
await roleUserDao.create_many([RoleUser(0, user.id, x) for x in obj.roles])
return user
@staticmethod
async def resolve_update(obj: UserUpdateInput, *_):
logger.debug(f"update user: {obj.__dict__}")
user = await userDao.get_by_id(obj.id)
if obj.roles is not None:
roles = await roleUserDao.get_by_user_id(user.id)
to_delete = (
roles
if len(obj.roles) == 0
else await roleUserDao.find_by(
[
{RoleUser.user_id: user.id},
{RoleUser.role_id: {"notIn": obj.get("roles", [])}},
]
)
)
role_ids = [x.role_id for x in roles]
deleted_role_ids = [
x.role_id
for x in await roleUserDao.find_by(
[{RoleUser.user_id: user.id}, {RoleUser.deleted: True}]
)
]
to_create = [
RoleUser(0, x, user.id)
for x in obj.roles
if x not in role_ids and x not in deleted_role_ids
]
to_restore = [
await roleUserDao.get_single_by(
[
{RoleUser.user_id: user.id},
{RoleUser.role_id: x},
]
)
for x in obj.roles
if x not in role_ids and x in deleted_role_ids
]
if len(to_delete) > 0:
await roleUserDao.delete_many(to_delete)
if len(to_create) > 0:
await roleUserDao.create_many(to_create)
if len(to_restore) > 0:
await roleUserDao.restore_many(to_restore)
return user
@staticmethod
async def resolve_delete(*_, id: int):
logger.debug(f"delete user: {id}")
user = await userDao.get_by_id(id)
await userDao.delete(user)
return True
@staticmethod
async def resolve_restore(*_, id: int):
logger.debug(f"restore user: {id}")
user = await userDao.get_by_id(id)
await userDao.restore(user)
return True

View File

View File

@ -0,0 +1,9 @@
from api_graphql.abc.db_model_query_abc import DbModelQueryABC
class ApiKeyQuery(DbModelQueryABC):
def __init__(self):
DbModelQueryABC.__init__(self, "ApiKey")
self.set_field("identifier", lambda x, *_: x.identifier)
self.set_field("key", lambda x, *_: x.key)

View File

@ -0,0 +1,9 @@
from api_graphql.abc.db_model_query_abc import DbModelQueryABC
class GroupQuery(DbModelQueryABC):
def __init__(self):
DbModelQueryABC.__init__(self, "Group")
self.set_field("name", lambda x, *_: x.name)
self.set_field("description", lambda x, *_: x.description)

View File

@ -0,0 +1,9 @@
from api_graphql.abc.db_model_query_abc import DbModelQueryABC
class PermissionQuery(DbModelQueryABC):
def __init__(self):
DbModelQueryABC.__init__(self, "Permission")
self.set_field("name", lambda x, *_: x.name)
self.set_field("description", lambda x, *_: x.description)

View File

@ -0,0 +1,11 @@
from api_graphql.abc.db_model_query_abc import DbModelQueryABC
class RoleQuery(DbModelQueryABC):
def __init__(self):
DbModelQueryABC.__init__(self, "Role")
self.set_field("name", lambda x, *_: x.name)
self.set_field("description", lambda x, *_: x.description)
self.set_field("permissions", lambda x, *_: x.permissions)
self.set_field("users", lambda x, *_: x.users)

View File

@ -0,0 +1,11 @@
from api_graphql.abc.db_model_query_abc import DbModelQueryABC
class ShortUrlQuery(DbModelQueryABC):
def __init__(self):
DbModelQueryABC.__init__(self, "ShortUrl")
self.set_field("shortUrl", lambda x, *_: x.short_url)
self.set_field("targetUrl", lambda x, *_: x.target_url)
self.set_field("description", lambda x, *_: x.description)
self.set_field("group", lambda x, *_: x.group)

View File

@ -0,0 +1,11 @@
from api_graphql.abc.db_model_query_abc import DbModelQueryABC
class UserQuery(DbModelQueryABC):
def __init__(self):
DbModelQueryABC.__init__(self, "User")
self.set_field("keycloakId", lambda x, *_: x.keycloak_id)
self.set_field("username", lambda x, *_: x.username)
self.set_field("email", lambda x, *_: x.email)
self.set_field("roles", lambda x, *_: x.roles)

Some files were not shown because too many files have changed in this diff Show More