[WIP] validate token via keycloak

This commit is contained in:
2025-09-21 21:07:09 +02:00
parent 073b35f71a
commit 7b37748ca6
3 changed files with 13 additions and 17 deletions

View File

@@ -17,12 +17,14 @@ class AuthenticationMiddleware(BaseHTTPMiddleware):
async def _verify_login(cls, token: str) -> bool:
keycloak = ServiceProviderABC.get_global_service(KeycloakClient)
try:
user_info = keycloak.userinfo(token)
if not user_info:
return False
except KeycloakAuthenticationError:
token_info = keycloak.introspect(token)
return token_info.get("active", False)
except KeycloakAuthenticationError as e:
_logger.debug(f"Keycloak authentication error: {e}")
return False
except Exception as e:
_logger.error(f"Unexpected error during token verification: {e}")
return False
return True
async def dispatch(self, request: Request, call_next):
url = request.url.path
@@ -40,7 +42,8 @@ class AuthenticationMiddleware(BaseHTTPMiddleware):
if not auth_header or not auth_header.startswith("Bearer "):
return Unauthorized("Invalid Authorization header").response()
if not await self._verify_login(auth_header.split("Bearer ")[1]):
token = auth_header.split("Bearer ")[1]
if not await self._verify_login(token):
_logger.debug(f"Unauthorized access to {url}, invalid token")
return Unauthorized("Invalid token").response()

View File

@@ -1,4 +1,4 @@
from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection
from keycloak import KeycloakOpenID
from cpl.auth.auth_logger import AuthLogger
from cpl.auth.keycloak_settings import KeycloakSettings
@@ -17,10 +17,3 @@ class KeycloakClient(KeycloakOpenID):
client_secret_key=settings.client_secret,
)
_logger.info("Initializing Keycloak client")
connection = KeycloakOpenIDConnection(
server_url=settings.url,
client_id=settings.client_id,
realm_name=settings.realm,
client_secret_key=settings.client_secret,
)
self._admin = KeycloakAdmin(connection=connection)

View File

@@ -24,19 +24,19 @@ class ServiceProviderABC(ABC):
return cls._provider
@classmethod
def get_global_service(cls, instance_type: T, *args, **kwargs) -> Optional[R]:
def get_global_service(cls, instance_type: Type[T], *args, **kwargs) -> Optional[T]:
if cls._provider is None:
return None
return cls._provider.get_service(instance_type, *args, **kwargs)
@classmethod
def get_global_services(cls, instance_type: T, *args, **kwargs) -> list[Optional[R]]:
def get_global_services(cls, instance_type: Type[T], *args, **kwargs) -> list[Optional[T]]:
if cls._provider is None:
return []
return cls._provider.get_services(instance_type, *args, **kwargs)
@abstractmethod
def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[R]: ...
def _build_by_signature(self, sig: Signature, origin_service_type: type = None) -> list[T]: ...
@abstractmethod
def _build_service(self, service_type: type, *args, **kwargs) -> object: