118 lines
3.2 KiB
Python
118 lines
3.2 KiB
Python
|
import asyncio
|
||
|
import sys
|
||
|
|
||
|
import eventlet
|
||
|
from eventlet import wsgi
|
||
|
from flask import Flask, request, Response, redirect, render_template
|
||
|
|
||
|
from core.database.database import Database
|
||
|
from core.environment import Environment
|
||
|
from core.logger import Logger
|
||
|
from data.schemas.public.short_url import ShortUrl
|
||
|
from data.schemas.public.short_url_dao import shortUrlDao
|
||
|
from data.schemas.public.short_url_visit import ShortUrlVisit
|
||
|
from data.schemas.public.short_url_visit_dao import shortUrlVisitDao
|
||
|
|
||
|
logger = Logger(__name__)
|
||
|
|
||
|
|
||
|
class Redirector(Flask):
|
||
|
|
||
|
def __init__(self, *args, **kwargs):
|
||
|
Flask.__init__(self, *args, **kwargs)
|
||
|
|
||
|
|
||
|
app = Redirector(__name__)
|
||
|
|
||
|
|
||
|
@app.route("/<path:path>")
|
||
|
async def _handle_request(path: str):
|
||
|
short_url = await _find_short_url_by_url(path)
|
||
|
if short_url is None:
|
||
|
return render_template("404.html"), 404
|
||
|
|
||
|
user_agent = request.headers.get("User-Agent", "").lower()
|
||
|
|
||
|
if "wheregoes" in user_agent or "someothertool" in user_agent:
|
||
|
return await _handle_short_url(path, short_url)
|
||
|
|
||
|
if short_url.loading_screen:
|
||
|
return render_template(
|
||
|
"redirect.html",
|
||
|
key=short_url.short_url,
|
||
|
target_url=_get_redirect_url(short_url.target_url),
|
||
|
)
|
||
|
|
||
|
return await _handle_short_url(path, short_url)
|
||
|
|
||
|
|
||
|
async def _handle_short_url(path: str, short_url: ShortUrl):
|
||
|
if path.startswith("api/"):
|
||
|
path = path.replace("api/", "")
|
||
|
|
||
|
try:
|
||
|
await shortUrlVisitDao.create(
|
||
|
ShortUrlVisit(0, short_url.id, request.headers.get("User-Agent"))
|
||
|
)
|
||
|
except Exception as e:
|
||
|
logger.error(f"Failed to update short url {short_url.short_url} with error", e)
|
||
|
|
||
|
return _do_redirect(short_url.target_url)
|
||
|
|
||
|
|
||
|
async def _find_short_url_by_url(url: str) -> ShortUrl:
|
||
|
return await shortUrlDao.find_single_by({ShortUrl.short_url: url})
|
||
|
|
||
|
|
||
|
def _get_redirect_url(url: str) -> str:
|
||
|
# todo: multiple protocols like ts3://
|
||
|
if not url.startswith("http://") and not url.startswith("https://"):
|
||
|
url = f"http://{url}"
|
||
|
|
||
|
return url
|
||
|
|
||
|
|
||
|
def _do_redirect(url: str) -> Response:
|
||
|
return redirect(_get_redirect_url(url))
|
||
|
|
||
|
|
||
|
async def configure():
|
||
|
Logger.set_level(Environment.get("LOG_LEVEL", str, "info"))
|
||
|
Environment.set_environment(Environment.get("ENVIRONMENT", str, "production"))
|
||
|
logger.info(f"Environment: {Environment.get_environment()}")
|
||
|
|
||
|
app.debug = Environment.get_environment() == "development"
|
||
|
|
||
|
await Database.startup_db()
|
||
|
|
||
|
|
||
|
def main():
|
||
|
if sys.platform == "win32":
|
||
|
from asyncio import WindowsSelectorEventLoopPolicy
|
||
|
|
||
|
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
|
||
|
|
||
|
loop = asyncio.new_event_loop()
|
||
|
loop.run_until_complete(configure())
|
||
|
loop.close()
|
||
|
|
||
|
port = Environment.get("PORT", int, 5001)
|
||
|
logger.info(f"Start API on port: {port}")
|
||
|
if Environment.get_environment() == "development":
|
||
|
logger.info(f"Playground: http://localhost:{port}/")
|
||
|
wsgi.server(eventlet.listen(("0.0.0.0", port)), app, log_output=False)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|
||
|
|
||
|
# ((
|
||
|
# ( )
|
||
|
# ; / ,
|
||
|
# / \/
|
||
|
# / |
|
||
|
# / ~/
|
||
|
# / ) ) ~ edraft
|
||
|
# ___// | /
|
||
|
# --' \_~-,
|