204 lines
6.0 KiB
Python
204 lines
6.0 KiB
Python
import asyncio
|
|
import sys
|
|
from typing import Optional
|
|
|
|
import requests
|
|
import uvicorn
|
|
from starlette.applications import Starlette
|
|
from starlette.requests import Request
|
|
from starlette.responses import RedirectResponse
|
|
from starlette.routing import Route, Mount
|
|
from starlette.staticfiles import StaticFiles
|
|
from starlette.templating import Jinja2Templates
|
|
|
|
from core.environment import Environment
|
|
from core.logger import Logger
|
|
|
|
logger = Logger(__name__)
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
|
|
async def index(request: Request):
|
|
return templates.TemplateResponse("404.html", {"request": request}, status_code=404)
|
|
|
|
|
|
async def handle_request(request: Request):
|
|
path = request.path_params["path"]
|
|
short_url = _find_short_url_by_path(path)
|
|
if short_url is None:
|
|
return templates.TemplateResponse(
|
|
"404.html", {"request": request}, status_code=404
|
|
)
|
|
|
|
domains = Environment.get("DOMAINS", list[str], [])
|
|
domain = short_url["domain"]
|
|
logger.debug(
|
|
f"Domain: {domain["name"] if domain is not None else None}, request.host: {request.headers['host']}"
|
|
)
|
|
|
|
host = request.headers["host"]
|
|
if ":" in host:
|
|
host = host.split(":")[0]
|
|
|
|
domain_strict_mode = Environment.get("DOMAIN_STRICT_MODE", bool, False)
|
|
if domain is not None and (
|
|
domain["name"] not in domains
|
|
or (domain_strict_mode and not host.endswith(domain["name"]))
|
|
):
|
|
return templates.TemplateResponse(
|
|
"404.html", {"request": request}, status_code=404
|
|
)
|
|
|
|
user_agent = request.headers.get("User-Agent", "").lower()
|
|
|
|
if "wheregoes" in user_agent or "someothertool" in user_agent:
|
|
return await _handle_short_url(request, short_url)
|
|
|
|
if short_url["loadingScreen"]:
|
|
await _track_visit(request, short_url)
|
|
|
|
return templates.TemplateResponse(
|
|
"redirect.html",
|
|
{
|
|
"request": request,
|
|
"key": short_url["shortUrl"],
|
|
"target_url": _get_redirect_url(short_url["targetUrl"]),
|
|
},
|
|
)
|
|
|
|
return await _handle_short_url(request, short_url)
|
|
|
|
|
|
def _find_short_url_by_path(path: str) -> Optional[dict]:
|
|
api_url = Environment.get("API_URL", str)
|
|
if api_url is None:
|
|
raise Exception("API_URL is not set")
|
|
|
|
api_key = Environment.get("API_KEY", str)
|
|
if api_key is None:
|
|
raise Exception("API_KEY is not set")
|
|
|
|
request = requests.post(
|
|
f"{api_url}/graphql",
|
|
json={
|
|
"query": f"""
|
|
query getShortUrlByPath($path: String!) {{
|
|
shortUrls(filter: {{ shortUrl: {{ equal: $path }}, deleted: {{ equal: false }} }}) {{
|
|
nodes {{
|
|
id
|
|
shortUrl
|
|
targetUrl
|
|
description
|
|
group {{
|
|
id
|
|
name
|
|
}}
|
|
domain {{
|
|
id
|
|
name
|
|
}}
|
|
loadingScreen
|
|
deleted
|
|
}}
|
|
}}
|
|
}}
|
|
""",
|
|
"variables": {"path": path},
|
|
},
|
|
headers={"Authorization": f"API-Key {api_key}"},
|
|
)
|
|
data = request.json()["data"]["shortUrls"]["nodes"]
|
|
if len(data) == 0:
|
|
return None
|
|
|
|
return data[0]
|
|
|
|
|
|
async def _handle_short_url(request: Request, short_url: dict):
|
|
await _track_visit(request, short_url)
|
|
|
|
return RedirectResponse(_get_redirect_url(short_url["targetUrl"]))
|
|
|
|
|
|
async def _track_visit(r: Request, short_url: dict):
|
|
api_url = Environment.get("API_URL", str)
|
|
if api_url is None:
|
|
raise Exception("API_URL is not set")
|
|
|
|
api_key = Environment.get("API_KEY", str)
|
|
if api_key is None:
|
|
raise Exception("API_KEY is not set")
|
|
|
|
try:
|
|
request = requests.post(
|
|
f"{api_url}/graphql",
|
|
json={
|
|
"query": f"""
|
|
mutation trackShortUrlVisit($id: ID!, $agent: String) {{
|
|
shortUrl {{
|
|
trackVisit(id: $id, agent: $agent)
|
|
}}
|
|
}}
|
|
""",
|
|
"variables": {
|
|
"id": short_url["id"],
|
|
"agent": r.headers.get("User-Agent"),
|
|
},
|
|
},
|
|
headers={"Authorization": f"API-Key {api_key}"},
|
|
)
|
|
if request.status_code != 200:
|
|
logger.warning(
|
|
f"Failed to track visit for short url {short_url["shortUrl"]}"
|
|
)
|
|
|
|
data = request.json()
|
|
if "errors" in data:
|
|
raise Exception(data["errors"])
|
|
else:
|
|
logger.debug(f"Tracked visit for short url {short_url["shortUrl"]}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to update short url {short_url["shortUrl"]} with error", e
|
|
)
|
|
|
|
|
|
def _get_redirect_url(url: str) -> str:
|
|
if not url.startswith("http://") and not url.startswith("https://"):
|
|
url = f"http://{url}"
|
|
|
|
return url
|
|
|
|
|
|
async def configure():
|
|
Logger.set_level(Environment.get("LOG_LEVEL", str, "info"))
|
|
Environment.set_environment(Environment.get("ENVIRONMENT", str, "production"))
|
|
logger.info(f"Environment: {Environment.get_environment()}")
|
|
|
|
|
|
routes = [
|
|
Route("/", endpoint=index),
|
|
Mount("/static", StaticFiles(directory="static"), name="static"),
|
|
Route("/{path:path}", endpoint=handle_request),
|
|
]
|
|
|
|
app = Starlette(routes=routes, on_startup=[configure])
|
|
|
|
|
|
def main():
|
|
if sys.platform == "win32":
|
|
from asyncio import WindowsSelectorEventLoopPolicy
|
|
|
|
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
|
|
|
|
uvicorn.run(
|
|
app,
|
|
host="0.0.0.0",
|
|
port=Environment.get("PORT", int, 5001),
|
|
log_config=None,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|