WIP: dev into master #184

Draft
edraft wants to merge 121 commits from dev into master
8 changed files with 137 additions and 57 deletions
Showing only changes of commit 683805137a - Show all commits

View File

@@ -1,5 +1,4 @@
import graphene
from cpl.api.middleware.request import get_request
from cpl.graphql.schema.query import Query
@@ -8,6 +7,5 @@ class HelloQuery(Query):
Query.__init__(self)
self.string_field(
"message",
args={"name": graphene.String(default_value="world")},
resolver=lambda *_, name: f"Hello {name}",
)
resolver=lambda *_, name: f"Hello {name} {get_request().state.request_id}",
).with_argument(str, "name", "Name to greet", "world")

View File

@@ -0,0 +1,22 @@
class Argument:
def __init__(self, t: type, name: str, description: str = None, default_value=None):
self._type = t
self._name = name
self._description = description
self._default_value = default_value
@property
def type(self) -> type:
return self._type
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str | None:
return self._description
@property
def default_value(self):
return self._default_value

View File

@@ -1,20 +1,25 @@
from cpl.graphql.schema.query import Query
from typing import Self
from cpl.graphql.schema.argument import Argument
from cpl.graphql.typing import TQuery
class Field:
def __init__(self, name: str, gql_type: str, resolver: callable, args: dict | None = None, subquery: Query | None = None):
def __init__(self, name: str, gql_type: type, resolver: callable, subquery: TQuery | None = None):
self._name = name
self._gql_type = gql_type
self._resolver = resolver
self._args = args or {}
self._subquery: Query | None = subquery
self._subquery = subquery
self._args: dict[str, Argument] = {}
@property
def name(self) -> str:
return self._name
@property
def type(self) -> str:
def type(self) -> type:
return self._gql_type
@property
@@ -26,5 +31,19 @@ class Field:
return self._args
@property
def subquery(self) -> Query | None:
def subquery(self) -> TQuery | None:
return self._subquery
def with_argument(self, arg_type: type, name: str, description: str = None, default_value=None) -> Self:
if name in self._args:
raise ValueError(f"Argument with name '{name}' already exists in field '{self._name}'")
self._args[name] = Argument(name, arg_type, description, default_value)
return self
def with_arguments(self, args: list[Argument]) -> Self:
for arg in args:
if not isinstance(arg, Argument):
raise ValueError(f"Expected Argument instance, got {type(arg)}")
self.with_argument(arg.name, arg.type, arg.description, arg.default_value)
return self

View File

@@ -1,7 +1,11 @@
from typing import Callable, Any, Type
from typing import Callable, Type
from graphene import ObjectType
from cpl.graphql.schema.field import Field
from cpl.graphql.typing import Resolver
from cpl.graphql.utils.type_converter import TypeConverter
class Query(ObjectType):
@@ -11,44 +15,35 @@ class Query(ObjectType):
ObjectType.__init__(self)
self._fields: dict[str, Field] = {}
def get_fields(self) -> dict[str, "Field"]:
def get_fields(self) -> dict[str, Field]:
return self._fields
def field(
self,
name: str,
t: type,
args: dict[str, Any] | None = None,
resolver: Callable | None = None,
):
gql_type_map: dict[object, str] = {
str: "String",
int: "Int",
float: "Float",
bool: "Boolean",
}
if t not in gql_type_map:
raise ValueError(f"Unsupported field type: {t}")
self,
name: str,
t: type,
resolver: Callable | None = None,
) -> "Field":
from cpl.graphql.schema.field import Field
self._fields[name] = Field(name, "String", resolver, args)
self._fields[name] = Field(name, t, resolver)
return self._fields[name]
def with_query(self, name: str, subquery: Type["Query"]):
from cpl.graphql.schema.field import Field
f = Field(name=name, gql_type="Object", resolver=lambda root, info, **kwargs: {}, subquery=subquery)
f = Field(name=name, gql_type=object, resolver=lambda root, info, **kwargs: {}, subquery=subquery)
self._fields[name] = f
return self._fields[name]
def string_field(self, name: str, args: dict[str, Any] | None = None, resolver: Callable | None = None):
self.field(name, str, args, resolver)
def string_field(self, name: str, resolver: Resolver = None) -> "Field":
return self.field(name, str, resolver)
def int_field(self, name: str, args: dict[str, Any] | None = None, resolver: Callable | None = None):
self.field(name, int, args, resolver)
def int_field(self, name: str, resolver: Resolver = None) -> "Field":
return self.field(name, int, resolver)
def float_field(self, name: str, args: dict[str, Any] | None = None, resolver: Callable | None = None):
self.field(name, float, args, resolver)
def float_field(self, name: str, resolver: Resolver = None) -> "Field":
return self.field(name, float, resolver)
def bool_field(self, name: str, args: dict[str, Any] | None = None, resolver: Callable | None = None):
self.field(name, bool, args, resolver)
def bool_field(self, name: str, resolver: Resolver = None) -> "Field":
return self.field(name, bool, resolver)

View File

@@ -1,9 +1,14 @@
from typing import Type
import graphene
from cpl.api.logger import APILogger
from cpl.dependency.service_provider import ServiceProvider
from cpl.graphql.schema.argument import Argument
from cpl.graphql.schema.query import Query
from cpl.graphql.schema.root_query import RootQuery
from cpl.graphql.typing import Resolver
from cpl.graphql.utils.type_converter import TypeConverter
class Schema:
@@ -31,26 +36,29 @@ class Schema:
)
return self._schema
@staticmethod
def _field_to_graphene(t: Type[graphene.Scalar] | type, args: dict[str, Argument] = None, resolver: Resolver = None) -> graphene.Field:
arguments = {}
if args is not None:
arguments = {
arg.name: graphene.Argument(TypeConverter.to_graphene(arg.type), description=arg.description, default_value=arg.default_value)
for arg in args.values()
}
return graphene.Field(t, args=arguments, resolver=resolver)
def to_graphene(self, query: Query, name: str | None = None):
assert query is not None, "Query cannot be None"
attrs = {}
for field in query.get_fields().values():
if field.type == "String":
attrs[field.name] = graphene.Field(
graphene.String,
**field.args,
resolver=field.resolver
)
elif field.type == "Object" and field.subquery is not None:
if field.type == object and field.subquery is not None:
subquery = self._provider.get_service(field.subquery)
sub = self.to_graphene(subquery, name=field.name.capitalize())
attrs[field.name] = graphene.Field(
sub,
**field.args,
resolver=field.resolver
)
attrs[field.name] = self._field_to_graphene(sub, field.args, field.resolver)
continue
attrs[field.name] = self._field_to_graphene(TypeConverter.to_graphene(field.type), field.args, field.resolver)
class_name = name or query.__class__.__name__
return type(class_name, (graphene.ObjectType,), attrs)

View File

@@ -1,5 +1,5 @@
from typing import Type
from typing import Type, Callable
from cpl.graphql.schema.query import Query
TQuery = Type[Query]
TQuery = Type["Query"]
Resolver = Callable
ScalarType = str | int | float | bool | object

View File

@@ -0,0 +1,38 @@
from typing import Type
import graphene
from cpl.graphql.typing import ScalarType
class TypeConverter:
@staticmethod
def from_graphene(t: Type[graphene.Scalar]) -> ScalarType:
graphene_type_map: dict[Type[graphene.Scalar], ScalarType] = {
graphene.String: str,
graphene.Int: int,
graphene.Float: float,
graphene.Boolean: bool,
graphene.ObjectType: object,
}
if t not in graphene_type_map:
raise ValueError(f"Unsupported field type: {t}")
return graphene_type_map[t]
@staticmethod
def to_graphene(t: ScalarType) -> Type[graphene.Scalar]:
type_graphene_map: dict[ScalarType, Type[graphene.Scalar]] = {
str: graphene.String,
int: graphene.Int,
float: graphene.Float,
bool: graphene.Boolean,
object: graphene.ObjectType,
}
if t not in type_graphene_map:
raise ValueError(f"Unsupported field type: {t}")
return type_graphene_map[t]