import asyncio import sys import eventlet from eventlet import wsgi from flask import Flask, request, Response, redirect, render_template from core.database.database import Database from core.environment import Environment from core.logger import Logger from data.schemas.public.short_url import ShortUrl from data.schemas.public.short_url_dao import shortUrlDao from data.schemas.public.short_url_visit import ShortUrlVisit from data.schemas.public.short_url_visit_dao import shortUrlVisitDao logger = Logger(__name__) class Redirector(Flask): def __init__(self, *args, **kwargs): Flask.__init__(self, *args, **kwargs) app = Redirector(__name__) @app.route("/") def index(): return render_template("404.html"), 404 @app.route("/") async def _handle_request(path: str): short_url = await _find_short_url_by_url(path) if short_url is None: return render_template("404.html"), 404 domains = Environment.get("DOMAINS", list[str], []) domain = await short_url.domain logger.debug(f"Domain: {domain.name}, request.host: {request.host}") host = request.host if ":" in host: host = host.split(":")[0] if domain is None or domain.name not in domains and not host.endswith(domain.name): return render_template("404.html"), 404 user_agent = request.headers.get("User-Agent", "").lower() if "wheregoes" in user_agent or "someothertool" in user_agent: return await _handle_short_url(path, short_url) if short_url.loading_screen: await _track_visit(short_url) return render_template( "redirect.html", key=short_url.short_url, target_url=_get_redirect_url(short_url.target_url), ) return await _handle_short_url(path, short_url) async def _handle_short_url(path: str, short_url: ShortUrl): if path.startswith("api/"): path = path.replace("api/", "") await _track_visit(short_url) return _do_redirect(short_url.target_url) async def _track_visit(short_url: ShortUrl): try: await shortUrlVisitDao.create( ShortUrlVisit(0, short_url.id, request.headers.get("User-Agent")) ) except Exception as e: logger.error(f"Failed to update short url {short_url.short_url} with error", e) async def _find_short_url_by_url(url: str) -> ShortUrl: return await shortUrlDao.find_single_by({ShortUrl.short_url: url}) def _get_redirect_url(url: str) -> str: # todo: multiple protocols like ts3:// if not url.startswith("http://") and not url.startswith("https://"): url = f"http://{url}" return url def _do_redirect(url: str) -> Response: return redirect(_get_redirect_url(url)) async def configure(): Logger.set_level(Environment.get("LOG_LEVEL", str, "info")) Environment.set_environment(Environment.get("ENVIRONMENT", str, "production")) logger.info(f"Environment: {Environment.get_environment()}") app.debug = Environment.get_environment() == "development" await Database.startup_db() def main(): if sys.platform == "win32": from asyncio import WindowsSelectorEventLoopPolicy asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) loop = asyncio.new_event_loop() loop.run_until_complete(configure()) loop.close() port = Environment.get("PORT", int, 5001) logger.info(f"Start API on port: {port}") if Environment.get_environment() == "development": logger.info(f"Playground: http://localhost:{port}/") wsgi.server(eventlet.listen(("0.0.0.0", port)), app, log_output=False) if __name__ == "__main__": main() # (( # ( ) # ; / , # / \/ # / | # / ~/ # / ) ) ~ edraft # ___// | / # --' \_~-,