open-redirect/api/src/redirector.py

130 lines
3.5 KiB
Python
Raw Normal View History

2025-01-09 16:52:34 +01:00
import asyncio
import sys
import eventlet
from eventlet import wsgi
from flask import Flask, request, Response, redirect, render_template
from core.database.database import Database
from core.environment import Environment
from core.logger import Logger
from data.schemas.public.short_url import ShortUrl
from data.schemas.public.short_url_dao import shortUrlDao
from data.schemas.public.short_url_visit import ShortUrlVisit
from data.schemas.public.short_url_visit_dao import shortUrlVisitDao
logger = Logger(__name__)
class Redirector(Flask):
def __init__(self, *args, **kwargs):
Flask.__init__(self, *args, **kwargs)
app = Redirector(__name__)
2025-01-09 22:51:08 +01:00
@app.route("/")
def index():
return render_template("404.html"), 404
2025-01-09 16:52:34 +01:00
@app.route("/<path:path>")
async def _handle_request(path: str):
short_url = await _find_short_url_by_url(path)
if short_url is None:
return render_template("404.html"), 404
2025-01-11 01:17:19 +01:00
domains = Environment.get("DOMAINS", list[str], [])
domain = await short_url.domain
if domain is not None and domain.name not in domains:
return render_template("404.html"), 404
2025-01-09 16:52:34 +01:00
user_agent = request.headers.get("User-Agent", "").lower()
if "wheregoes" in user_agent or "someothertool" in user_agent:
return await _handle_short_url(path, short_url)
if short_url.loading_screen:
2025-01-10 13:18:38 +01:00
await _track_visit(short_url)
2025-01-09 16:52:34 +01:00
return render_template(
"redirect.html",
key=short_url.short_url,
target_url=_get_redirect_url(short_url.target_url),
)
return await _handle_short_url(path, short_url)
async def _handle_short_url(path: str, short_url: ShortUrl):
if path.startswith("api/"):
path = path.replace("api/", "")
2025-01-10 13:18:38 +01:00
await _track_visit(short_url)
return _do_redirect(short_url.target_url)
async def _track_visit(short_url: ShortUrl):
2025-01-09 16:52:34 +01:00
try:
await shortUrlVisitDao.create(
ShortUrlVisit(0, short_url.id, request.headers.get("User-Agent"))
)
except Exception as e:
logger.error(f"Failed to update short url {short_url.short_url} with error", e)
async def _find_short_url_by_url(url: str) -> ShortUrl:
return await shortUrlDao.find_single_by({ShortUrl.short_url: url})
def _get_redirect_url(url: str) -> str:
# todo: multiple protocols like ts3://
if not url.startswith("http://") and not url.startswith("https://"):
url = f"http://{url}"
return url
def _do_redirect(url: str) -> Response:
return redirect(_get_redirect_url(url))
async def configure():
Logger.set_level(Environment.get("LOG_LEVEL", str, "info"))
Environment.set_environment(Environment.get("ENVIRONMENT", str, "production"))
logger.info(f"Environment: {Environment.get_environment()}")
app.debug = Environment.get_environment() == "development"
await Database.startup_db()
def main():
if sys.platform == "win32":
from asyncio import WindowsSelectorEventLoopPolicy
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
loop = asyncio.new_event_loop()
loop.run_until_complete(configure())
loop.close()
port = Environment.get("PORT", int, 5001)
logger.info(f"Start API on port: {port}")
if Environment.get_environment() == "development":
logger.info(f"Playground: http://localhost:{port}/")
wsgi.server(eventlet.listen(("0.0.0.0", port)), app, log_output=False)
if __name__ == "__main__":
main()
# ((
# ( )
# ; / ,
# / \/
# / |
# / ~/
# / ) ) ~ edraft
# ___// | /
# --' \_~-,