From 7b37748ca66e2c1bd6fe4bf7e2e38ebd10de979a Mon Sep 17 00:00:00 2001 From: edraft Date: Sun, 21 Sep 2025 21:07:09 +0200 Subject: [PATCH] [WIP] validate token via keycloak --- src/cpl-api/cpl/api/middleware/authentication.py | 15 +++++++++------ src/cpl-auth/cpl/auth/keycloak/keycloak_client.py | 9 +-------- .../cpl/dependency/service_provider_abc.py | 6 +++--- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/cpl-api/cpl/api/middleware/authentication.py b/src/cpl-api/cpl/api/middleware/authentication.py index d82ed747..4a491042 100644 --- a/src/cpl-api/cpl/api/middleware/authentication.py +++ b/src/cpl-api/cpl/api/middleware/authentication.py @@ -17,12 +17,14 @@ class AuthenticationMiddleware(BaseHTTPMiddleware): async def _verify_login(cls, token: str) -> bool: keycloak = ServiceProviderABC.get_global_service(KeycloakClient) try: - user_info = keycloak.userinfo(token) - if not user_info: - return False - except KeycloakAuthenticationError: + token_info = keycloak.introspect(token) + return token_info.get("active", False) + except KeycloakAuthenticationError as e: + _logger.debug(f"Keycloak authentication error: {e}") + return False + except Exception as e: + _logger.error(f"Unexpected error during token verification: {e}") return False - return True async def dispatch(self, request: Request, call_next): url = request.url.path @@ -40,7 +42,8 @@ class AuthenticationMiddleware(BaseHTTPMiddleware): if not auth_header or not auth_header.startswith("Bearer "): return Unauthorized("Invalid Authorization header").response() - if not await self._verify_login(auth_header.split("Bearer ")[1]): + token = auth_header.split("Bearer ")[1] + if not await self._verify_login(token): _logger.debug(f"Unauthorized access to {url}, invalid token") return Unauthorized("Invalid token").response() diff --git a/src/cpl-auth/cpl/auth/keycloak/keycloak_client.py b/src/cpl-auth/cpl/auth/keycloak/keycloak_client.py index df5cd472..6bc4624c 100644 --- a/src/cpl-auth/cpl/auth/keycloak/keycloak_client.py +++ b/src/cpl-auth/cpl/auth/keycloak/keycloak_client.py @@ -1,4 +1,4 @@ -from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection +from keycloak import KeycloakOpenID from cpl.auth.auth_logger import AuthLogger from cpl.auth.keycloak_settings import KeycloakSettings @@ -17,10 +17,3 @@ class KeycloakClient(KeycloakOpenID): client_secret_key=settings.client_secret, ) _logger.info("Initializing Keycloak client") - connection = KeycloakOpenIDConnection( - server_url=settings.url, - client_id=settings.client_id, - realm_name=settings.realm, - client_secret_key=settings.client_secret, - ) - self._admin = KeycloakAdmin(connection=connection) diff --git a/src/cpl-dependency/cpl/dependency/service_provider_abc.py b/src/cpl-dependency/cpl/dependency/service_provider_abc.py index 6e0c5dda..68980103 100644 --- a/src/cpl-dependency/cpl/dependency/service_provider_abc.py +++ b/src/cpl-dependency/cpl/dependency/service_provider_abc.py @@ -24,19 +24,19 @@ class ServiceProviderABC(ABC): return cls._provider @classmethod - def get_global_service(cls, instance_type: T, *args, **kwargs) -> Optional[R]: + def get_global_service(cls, instance_type: Type[T], *args, **kwargs) -> Optional[T]: if cls._provider is None: return None return cls._provider.get_service(instance_type, *args, **kwargs) @classmethod - def get_global_services(cls, instance_type: T, *args, **kwargs) -> list[Optional[R]]: + def get_global_services(cls, instance_type: Type[T], *args, **kwargs) -> list[Optional[T]]: if cls._provider is None: return [] return cls._provider.get_services(instance_type, *args, **kwargs) @abstractmethod - def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[R]: ... + def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[T]: ... @abstractmethod def _build_service(self, service_type: type, *args, **kwargs) -> object: