All checks were successful
Test before pr merge / test-lint (pull_request) Successful in 6s
31 lines
848 B
Python
31 lines
848 B
Python
import asyncio
|
|
from datetime import datetime
|
|
|
|
from cpl.core.console import Console
|
|
from cpl.core.time.cron import Cron
|
|
from cpl.core.service.cronjob import CronjobABC
|
|
from cpl.core.service.hosted_service import HostedService
|
|
|
|
|
|
class Hosted(HostedService):
|
|
def __init__(self):
|
|
self._stopped = False
|
|
|
|
async def start(self):
|
|
Console.write_line("Hosted Service Started")
|
|
while not self._stopped:
|
|
Console.write_line("Hosted Service Running")
|
|
await asyncio.sleep(5)
|
|
|
|
async def stop(self):
|
|
Console.write_line("Hosted Service Stopped")
|
|
self._stopped = True
|
|
|
|
|
|
class MyCronJob(CronjobABC):
|
|
def __init__(self):
|
|
CronjobABC.__init__(self, Cron("*/1 * * * *")) # Every minute
|
|
|
|
async def loop(self):
|
|
Console.write_line(f"[{datetime.now()}] Hello from Cronjob!")
|