Files
cpl/example/api/src/main.py
edraft 56a16cbeba
Some checks failed
Test before pr merge / test-lint (pull_request) Failing after 6s
Build on push / prepare (push) Successful in 10s
Build on push / query (push) Successful in 19s
Build on push / core (push) Successful in 20s
Build on push / dependency (push) Successful in 17s
Build on push / application (push) Successful in 15s
Build on push / database (push) Successful in 16s
Build on push / mail (push) Successful in 18s
Build on push / translation (push) Successful in 22s
Build on push / auth (push) Successful in 18s
Build on push / api (push) Successful in 17s
Module dependencies as static var
2025-09-26 08:46:30 +02:00

87 lines
2.5 KiB
Python

from starlette.responses import JSONResponse
from cpl.api.api_module import ApiModule
from cpl.api.application.web_app import WebApp
from cpl.application.application_builder import ApplicationBuilder
from cpl.auth import AuthModule
from cpl.auth.permission.permissions import Permissions
from cpl.auth.schema import AuthUser, Role
from cpl.core.configuration import Configuration
from cpl.core.console import Console
from cpl.core.environment import Environment
from cpl.core.utils.cache import Cache
from cpl.database.mysql.mysql_module import MySQLModule
from scoped_service import ScopedService
from service import PingService
def main():
builder = ApplicationBuilder[WebApp](WebApp)
Configuration.add_json_file(f"appsettings.json")
Configuration.add_json_file(f"appsettings.{Environment.get_environment()}.json")
Configuration.add_json_file(f"appsettings.{Environment.get_host_name()}.json", optional=True)
# builder.services.add_logging()
builder.services.add_structured_logging()
builder.services.add_transient(PingService)
builder.services.add_module(MySQLModule)
builder.services.add_module(ApiModule)
builder.services.add_scoped(ScopedService)
builder.services.add_cache(AuthUser)
builder.services.add_cache(Role)
app = builder.build()
app.with_logging()
app.with_database()
app.with_authentication()
app.with_authorization()
app.with_route(
path="/route1",
fn=lambda r: JSONResponse("route1"),
method="GET",
authentication=True,
permissions=[Permissions.administrator],
)
app.with_routes_directory("routes")
provider = builder.service_provider
user_cache = provider.get_service(Cache[AuthUser])
role_cache = provider.get_service(Cache[Role])
if role_cache == user_cache:
raise Exception("Cache service is not working")
s1 = provider.get_service(ScopedService)
s2 = provider.get_service(ScopedService)
if s1.name == s2.name:
raise Exception("Scoped service is not working")
with provider.create_scope() as scope:
s3 = scope.get_service(ScopedService)
s4 = scope.get_service(ScopedService)
if s3.name != s4.name:
raise Exception("Scoped service is not working")
if s1.name == s3.name:
raise Exception("Scoped service is not working")
Console.write_line(
s1.name,
s2.name,
s3.name,
s4.name,
)
app.run()
if __name__ == "__main__":
main()