All checks were successful
Build on push / prepare (push) Successful in 10s
Build on push / core (push) Successful in 19s
Build on push / query (push) Successful in 18s
Build on push / dependency (push) Successful in 17s
Build on push / database (push) Successful in 15s
Build on push / translation (push) Successful in 18s
Build on push / mail (push) Successful in 19s
Build on push / application (push) Successful in 21s
Build on push / auth (push) Successful in 14s
Build on push / api (push) Successful in 14s
77 lines
3.1 KiB
Python
77 lines
3.1 KiB
Python
from keycloak import KeycloakAuthenticationError
|
|
from starlette.types import Scope, Receive, Send
|
|
|
|
from cpl.api.abc.asgi_middleware_abc import ASGIMiddleware
|
|
from cpl.api.api_logger import APILogger
|
|
from cpl.api.error import Unauthorized
|
|
from cpl.api.middleware.request import get_request
|
|
from cpl.api.router import Router
|
|
from cpl.auth.keycloak import KeycloakClient
|
|
from cpl.auth.schema import AuthUserDao, AuthUser
|
|
from cpl.dependency import ServiceProviderABC
|
|
|
|
_logger = APILogger(__name__)
|
|
|
|
|
|
class AuthenticationMiddleware(ASGIMiddleware):
|
|
|
|
@ServiceProviderABC.inject
|
|
def __init__(self, app, keycloak: KeycloakClient, user_dao: AuthUserDao):
|
|
ASGIMiddleware.__init__(self, app)
|
|
|
|
self._keycloak = keycloak
|
|
self._user_dao = user_dao
|
|
|
|
async def __call__(self, scope: Scope, receive: Receive, send: Send):
|
|
request = get_request()
|
|
url = request.url.path
|
|
|
|
if url not in Router.get_auth_required_routes():
|
|
_logger.trace(f"No authentication required for {url}")
|
|
return await self._app(scope, receive, send)
|
|
|
|
if not request.headers.get("Authorization"):
|
|
_logger.debug(f"Unauthorized access to {url}, missing Authorization header")
|
|
return await Unauthorized(f"Missing header Authorization").asgi_response(scope, receive, send)
|
|
|
|
auth_header = request.headers.get("Authorization", None)
|
|
if not auth_header or not auth_header.startswith("Bearer "):
|
|
return await Unauthorized("Invalid Authorization header").asgi_response(scope, receive, send)
|
|
|
|
token = auth_header.split("Bearer ")[1]
|
|
if not await self._verify_login(token):
|
|
_logger.debug(f"Unauthorized access to {url}, invalid token")
|
|
return await Unauthorized("Invalid token").asgi_response(scope, receive, send)
|
|
|
|
# check user exists in db, if not create
|
|
keycloak_id = self._keycloak.get_user_id(token)
|
|
if keycloak_id is None:
|
|
return await Unauthorized("Failed to get user id from token").asgi_response(scope, receive, send)
|
|
|
|
user = await self._get_or_crate_user(keycloak_id)
|
|
if user.deleted:
|
|
_logger.debug(f"Unauthorized access to {url}, user is deleted")
|
|
return await Unauthorized("User is deleted").asgi_response(scope, receive, send)
|
|
|
|
return await self._call_next(scope, receive, send)
|
|
|
|
async def _get_or_crate_user(self, keycloak_id: str) -> AuthUser:
|
|
existing = await self._user_dao.find_by_keycloak_id(keycloak_id)
|
|
if existing is not None:
|
|
return existing
|
|
|
|
user = AuthUser(0, keycloak_id)
|
|
uid = await self._user_dao.create(user)
|
|
return await self._user_dao.get_by_id(uid)
|
|
|
|
async def _verify_login(self, token: str) -> bool:
|
|
try:
|
|
token_info = self._keycloak.introspect(token)
|
|
return token_info.get("active", False)
|
|
except KeycloakAuthenticationError as e:
|
|
_logger.debug(f"Keycloak authentication error: {e}")
|
|
return False
|
|
except Exception as e:
|
|
_logger.error(f"Unexpected error during token verification: {e}")
|
|
return False
|