Compare commits

..

1 Commits

Author SHA1 Message Date
69bbbc8cee Authorization via with_route
Some checks failed
Build on push / prepare (push) Successful in 9s
Build on push / query (push) Successful in 20s
Build on push / core (push) Successful in 20s
Build on push / dependency (push) Successful in 17s
Build on push / mail (push) Successful in 15s
Build on push / application (push) Successful in 18s
Build on push / translation (push) Successful in 18s
Build on push / database (push) Successful in 25s
Build on push / auth (push) Successful in 15s
Build on push / api (push) Successful in 14s
Test before pr merge / test-lint (pull_request) Failing after 6s
2025-09-22 22:03:42 +02:00
11 changed files with 15 additions and 36 deletions

View File

@@ -1,8 +1,5 @@
from cpl.dependency.service_collection import ServiceCollection as _ServiceCollection
from .error import APIError, AlreadyExists, EndpointNotImplemented, Forbidden, NotFound, Unauthorized
from .logger import APILogger
from .settings import ApiSettings
def add_api(collection: _ServiceCollection):
try:

View File

@@ -1 +0,0 @@
from .asgi_middleware_abc import ASGIMiddleware

View File

@@ -1 +0,0 @@
from .web_app import WebApp

View File

@@ -33,7 +33,6 @@ _logger = APILogger("API")
PolicyInput = Union[dict[str, PolicyResolver], Policy]
class WebApp(ApplicationABC):
def __init__(self, services: ServiceProviderABC):
super().__init__(services, [auth, api])

View File

@@ -1,4 +0,0 @@
from .authentication import AuthenticationMiddleware
from .authorization import AuthorizationMiddleware
from .logging import LoggingMiddleware
from .request import RequestMiddleware

View File

@@ -53,13 +53,9 @@ class AuthorizationMiddleware(ASGIMiddleware):
if rule["permissions"]:
if match == ValidationMatch.all and not all(p in perm_names for p in rule["permissions"]):
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(
scope, receive, send
)
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(scope, receive, send)
if match == ValidationMatch.any and not any(p in perm_names for p in rule["permissions"]):
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(
scope, receive, send
)
return await Forbidden(f"missing permissions: {rule["permissions"]}").asgi_response(scope, receive, send)
for policy_name in rule["policies"]:
policy = self._policies.get(policy_name)
@@ -70,4 +66,4 @@ class AuthorizationMiddleware(ASGIMiddleware):
if not await policy.resolve(user):
return await Forbidden(f"policy {policy.name} failed").asgi_response(scope, receive, send)
return await self._call_next(scope, receive, send)
return await self._call_next(scope, receive, send)

View File

@@ -1,3 +0,0 @@
from .api_route import ApiRoute
from .policy import Policy
from .validation_match import ValidationMatch

View File

@@ -7,7 +7,13 @@ from cpl.api.typing import HTTPMethods
class ApiRoute:
def __init__(self, path: str, fn: Callable, method: HTTPMethods, **kwargs):
def __init__(
self,
path: str,
fn: Callable,
method: HTTPMethods,
**kwargs
):
self._path = path
self._fn = fn
self._method = method

View File

@@ -1,2 +0,0 @@
from .policy import PolicyRegistry
from .route import RouteRegistry

View File

@@ -41,13 +41,7 @@ class Router:
return inner
@classmethod
def authorize(
cls,
roles: list[str | Enum] = None,
permissions: list[str | Enum] = None,
policies: list[str] = None,
match: ValidationMatch = None,
):
def authorize(cls, roles: list[str | Enum]=None, permissions: list[str | Enum]=None, policies: list[str]=None, match: ValidationMatch=None):
"""
Decorator to mark a route as requiring authorization.
Usage:
@@ -91,15 +85,15 @@ class Router:
return inner
@classmethod
def route(cls, path: str, method: HTTPMethods, registry: RouteRegistry = None, **kwargs):
def route(cls, path: str, method: HTTPMethods, registry: RouteRegistry=None, **kwargs):
if not registry:
from cpl.api.model.api_route import ApiRoute
from cpl.dependency.service_provider_abc import ServiceProviderABC
routes = ServiceProviderABC.get_global_service(RouteRegistry)
else:
routes = registry
def inner(fn):
routes.add(ApiRoute(path, fn, method, **kwargs))
setattr(fn, "_route_path", path)
@@ -144,9 +138,7 @@ class Router:
from cpl.api.model.api_route import ApiRoute
from cpl.dependency.service_provider_abc import ServiceProviderABC
routes = ServiceProviderABC.get_global_service(RouteRegistry)
def inner(fn):
path = getattr(fn, "_route_path", None)
if path is None:
@@ -155,7 +147,7 @@ class Router:
route = routes.get(path)
if route is None:
raise ValueError(f"Cannot override a route that does not exist: {path}")
routes.add(ApiRoute(path, fn, route.method, **route.kwargs))
setattr(fn, "_route_path", path)
return fn

View File

@@ -16,4 +16,4 @@ PartialMiddleware = Union[
Middleware,
Callable[[ASGIApp], ASGIApp],
]
PolicyResolver = Callable[[AuthUser], bool | Awaitable[bool]]
PolicyResolver = Callable[[AuthUser], bool | Awaitable[bool]]