[WIP] collection #181
Some checks failed
Test before pr merge / test-lint (pull_request) Failing after 6s

This commit is contained in:
2025-09-27 04:08:32 +02:00
parent 7673c3d10e
commit 1b6423c079
10 changed files with 144 additions and 33 deletions

View File

@@ -1,6 +1,29 @@
from cpl.api.middleware.request import get_request
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.query import Query
from cpl.graphql.schema.sort.sort import Sort
class User:
def __init__(self, id: int, name: str):
self.id = id
self.name = name
class UserFilter(Filter[User]):
def __init__(self, obj: dict):
Filter.__init__(self, obj)
self.field("id", int)
self.field("name", str)
class UserSort(Sort[User]):
def __init__(self, obj: dict):
Sort.__init__(self, obj)
self.field("id")
self.field("name")
class HelloQuery(Query):
def __init__(self):
@@ -9,3 +32,15 @@ class HelloQuery(Query):
"message",
resolver=lambda *_, name: f"Hello {name} {get_request().state.request_id}",
).with_argument(str, "name", "Name to greet", "world")
self.collection_field(
int,
"items",
UserFilter,
UserSort,
resolver=self._resolve_collection,
)
@staticmethod
def _resolve_collection(filter, sort, skip, take):
return []

View File

@@ -37,7 +37,7 @@ class Field:
def with_argument(self, arg_type: type, name: str, description: str = None, default_value=None) -> Self:
if name in self._args:
raise ValueError(f"Argument with name '{name}' already exists in field '{self._name}'")
self._args[name] = Argument(name, arg_type, description, default_value)
self._args[name] = Argument(arg_type, name, description, default_value)
return self
def with_arguments(self, args: list[Argument]) -> Self:
@@ -45,5 +45,5 @@ class Field:
if not isinstance(arg, Argument):
raise ValueError(f"Expected Argument instance, got {type(arg)}")
self.with_argument(arg.name, arg.type, arg.description, arg.default_value)
self.with_argument(arg.type, arg.name, arg.description, arg.default_value)
return self

View File

@@ -0,0 +1,26 @@
from abc import ABC
from datetime import datetime
from typing import Type
class Filter[T](ABC):
def __init__(
self,
obj: dict | None,
):
ABC.__init__(self)
self._obj = obj
self._values = {}
def field(
self,
field: str,
filter_type: Type["Filter"] | Type[int | str | bool | datetime | list],
):
if field not in self._obj:
return
# if issubclass
self._values[field] = filter_type(self._obj[field])

View File

@@ -2,9 +2,9 @@ from typing import Callable, Type
from graphene import ObjectType
from cpl.graphql.schema.argument import Argument
from cpl.graphql.schema.field import Field
from cpl.graphql.typing import Resolver
from cpl.graphql.utils.type_converter import TypeConverter
class Query(ObjectType):
@@ -32,7 +32,7 @@ class Query(ObjectType):
def with_query(self, name: str, subquery: Type["Query"]):
from cpl.graphql.schema.field import Field
f = Field(name=name, gql_type=object, resolver=lambda root, info, **kwargs: {}, subquery=subquery)
f = Field(name=name, gql_type=subquery, resolver=lambda root, info, **kwargs: {}, subquery=subquery)
self._fields[name] = f
return self._fields[name]
@@ -47,3 +47,15 @@ class Query(ObjectType):
def bool_field(self, name: str, resolver: Resolver = None) -> "Field":
return self.field(name, bool, resolver)
def collection_field(
self, t: type, name: str, filter_type: type, sort_type: type, resolver: Resolver = None
) -> "Field":
return self.field(name, list[t], resolver).with_arguments(
[
Argument(filter_type, "filter"),
Argument(sort_type, "sort"),
Argument(int, "skip", default_value=0),
Argument(int, "take", default_value=10),
]
)

View File

@@ -0,0 +1,25 @@
from abc import ABC
from cpl.graphql.schema.sort.sort_order import SortOrder
class Sort[T](ABC):
def __init__(
self,
obj: dict | None,
):
ABC.__init__(self)
self._obj = obj
self._values = {}
def field(
self,
field: str,
):
if field not in self._obj:
return
# if issubclass
self._values[field] = SortOrder.DESC

View File

@@ -0,0 +1,6 @@
from enum import Enum, auto
class SortOrder(Enum):
ASC = auto()
DESC = auto()

View File

@@ -1,3 +1,4 @@
from inspect import isclass
from typing import Type
import graphene
@@ -41,7 +42,7 @@ class Schema:
arguments = {}
if args is not None:
arguments = {
arg.name: graphene.Argument(TypeConverter.to_graphene(arg.type), description=arg.description, default_value=arg.default_value)
arg.name: graphene.Argument(TypeConverter.to_graphene(arg.type), name=arg.name, description=arg.description, default_value=arg.default_value)
for arg in args.values()
}
@@ -52,7 +53,7 @@ class Schema:
attrs = {}
for field in query.get_fields().values():
if field.type == object and field.subquery is not None:
if isclass(field.type) and issubclass(field.type, Query) and field.subquery is not None:
subquery = self._provider.get_service(field.subquery)
sub = self.to_graphene(subquery, name=field.name.capitalize())
attrs[field.name] = self._field_to_graphene(sub, field.args, field.resolver)

View File

@@ -1,38 +1,44 @@
from typing import Type
import typing
import graphene
from typing import Any, get_origin, get_args
from cpl.graphql.typing import ScalarType
from cpl.graphql.schema.filter.filter import Filter
from cpl.graphql.schema.sort.sort import Sort
class TypeConverter:
scalar_map: dict[Any, type[graphene.Scalar]] = {
str: graphene.String,
int: graphene.Int,
float: graphene.Float,
bool: graphene.Boolean,
}
@staticmethod
def from_graphene(t: Type[graphene.Scalar]) -> ScalarType:
graphene_type_map: dict[Type[graphene.Scalar], ScalarType] = {
graphene.String: str,
graphene.Int: int,
graphene.Float: float,
graphene.Boolean: bool,
graphene.ObjectType: object,
}
@classmethod
def to_graphene(cls, t: Any) -> Any:
origin = get_origin(t)
args = get_args(t)
if t not in graphene_type_map:
raise ValueError(f"Unsupported field type: {t}")
if t in cls.scalar_map:
return cls.scalar_map[t]
return graphene_type_map[t]
if origin in (list, typing.List):
if not args:
raise ValueError("List must specify element type, e.g. list[str]")
inner = cls.to_graphene(args[0])
return graphene.List(inner)
@staticmethod
def to_graphene(t: ScalarType) -> Type[graphene.Scalar]:
type_graphene_map: dict[ScalarType, Type[graphene.Scalar]] = {
str: graphene.String,
int: graphene.Int,
float: graphene.Float,
bool: graphene.Boolean,
object: graphene.ObjectType,
}
if t is list or t is typing.List:
raise ValueError("List must be parametrized: list[str], list[int], list[UserQuery]")
if t not in type_graphene_map:
raise ValueError(f"Unsupported field type: {t}")
from cpl.graphql.schema.query import Query
if isinstance(t, type) and issubclass(t, Query):
return t
return type_graphene_map[t]
if isinstance(t, type) and issubclass(t, Filter):
return t
if isinstance(t, type) and issubclass(t, Sort):
return t
raise ValueError(f"Unsupported field type: {t}")