open-redirect/api/src/redirector.py
edraft 2dac04d1a4
All checks were successful
Build on push / prepare (push) Successful in 15s
Build on push / build-api (push) Successful in 32s
Build on push / build-redirector (push) Successful in 33s
Build on push / build-web (push) Successful in 59s
Loading-screen default false
2025-03-11 18:49:43 +01:00

206 lines
6.1 KiB
Python

import asyncio
import sys
from typing import Optional
import requests
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import RedirectResponse
from starlette.routing import Route, Mount
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates
from core.environment import Environment
from core.logger import Logger
logger = Logger(__name__)
templates = Jinja2Templates(directory="templates")
async def index(request: Request):
return templates.TemplateResponse("404.html", {"request": request}, status_code=404)
async def handle_request(request: Request):
path = request.path_params["path"]
short_url = _find_short_url_by_path(path)
if short_url is None:
return templates.TemplateResponse(
"404.html", {"request": request}, status_code=404
)
domains = Environment.get("DOMAINS", list[str], [])
domain = short_url["domain"]
logger.debug(
f"Domain: {domain["name"] if domain is not None else None}, request.host: {request.headers['host']}"
)
host = request.headers["host"]
if ":" in host:
host = host.split(":")[0]
domain_strict_mode = Environment.get("DOMAIN_STRICT_MODE", bool, False)
if domain is not None and (
domain["name"] not in domains
or (domain_strict_mode and not host.endswith(domain["name"]))
):
return templates.TemplateResponse(
"404.html", {"request": request}, status_code=404
)
user_agent = request.headers.get("User-Agent", "").lower()
if "wheregoes" in user_agent or "someothertool" in user_agent:
return await _handle_short_url(request, short_url)
if short_url["loadingScreen"]:
await _track_visit(request, short_url)
return templates.TemplateResponse(
"redirect.html",
{
"request": request,
"key": short_url["shortUrl"],
"target_url": _get_redirect_url(short_url["targetUrl"]),
},
)
return await _handle_short_url(request, short_url)
def _find_short_url_by_path(path: str) -> Optional[dict]:
api_url = Environment.get("API_URL", str)
if api_url is None:
raise Exception("API_URL is not set")
api_key = Environment.get("API_KEY", str)
if api_key is None:
raise Exception("API_KEY is not set")
request = requests.post(
f"{api_url}/graphql",
json={
"query": f"""
query getShortUrlByPath($path: String!) {{
shortUrls(filter: {{ shortUrl: {{ equal: $path }}, deleted: {{ equal: false }} }}) {{
nodes {{
id
shortUrl
targetUrl
description
group {{
id
name
}}
domain {{
id
name
}}
loadingScreen
deleted
}}
}}
}}
""",
"variables": {"path": path},
},
headers={"Authorization": f"API-Key {api_key}"},
)
data = request.json()["data"]["shortUrls"]["nodes"]
if len(data) == 0:
return None
return data[0]
async def _handle_short_url(request: Request, short_url: dict):
await _track_visit(request, short_url)
return RedirectResponse(_get_redirect_url(short_url["targetUrl"]))
async def _track_visit(r: Request, short_url: dict):
api_url = Environment.get("API_URL", str)
if api_url is None:
raise Exception("API_URL is not set")
api_key = Environment.get("API_KEY", str)
if api_key is None:
raise Exception("API_KEY is not set")
try:
request = requests.post(
f"{api_url}/graphql",
json={
"query": f"""
mutation trackShortUrlVisit($id: ID!, $agent: String) {{
shortUrl {{
trackVisit(id: $id, agent: $agent)
}}
}}
""",
"variables": {
"id": short_url["id"],
"agent": r.headers.get("User-Agent"),
},
},
headers={"Authorization": f"API-Key {api_key}"},
)
if request.status_code != 200:
logger.warning(
f"Failed to track visit for short url {short_url["shortUrl"]}"
)
data = request.json()
if "errors" in data:
raise Exception(data["errors"])
else:
logger.debug(f"Tracked visit for short url {short_url["shortUrl"]}")
except Exception as e:
logger.error(
f"Failed to update short url {short_url["shortUrl"]} with error", e
)
def _get_redirect_url(url: str) -> str:
protocols = Environment.get("PROTOCOLS", list[str], ["http", "https"])
if not any(url.startswith(f"{protocol}://") for protocol in protocols):
url = f"http://{url}"
return url
async def configure():
Logger.set_level(Environment.get("LOG_LEVEL", str, "info"))
Environment.set_environment(Environment.get("ENVIRONMENT", str, "production"))
logger.info(f"Environment: {Environment.get_environment()}")
routes = [
Route("/", endpoint=index),
Mount("/static", StaticFiles(directory="static"), name="static"),
Route("/{path:path}", endpoint=handle_request),
]
app = Starlette(routes=routes, on_startup=[configure])
def main():
if sys.platform == "win32":
from asyncio import WindowsSelectorEventLoopPolicy
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
uvicorn.run(
app,
host="0.0.0.0",
port=Environment.get("PORT", int, 5001),
log_config=None,
)
if __name__ == "__main__":
main()