diff --git a/src/cpl_reactive_extensions/interval.py b/src/cpl_reactive_extensions/interval.py new file mode 100644 index 00000000..c33b0acc --- /dev/null +++ b/src/cpl_reactive_extensions/interval.py @@ -0,0 +1,47 @@ +import sched +import threading +import time +from typing import Callable + +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.subscriber import Subscriber + + +class Interval(Observable): + def __init__(self, interval: float, callback: Callable = None): + self._interval = interval + callback = callback if callback is not None else self._default_callback + + def schedule(x: Subscriber): + scheduler = sched.scheduler(time.time, time.sleep) + scheduler.enter( + self._interval, + 1, + self._run, + (scheduler, x, callback), + ) + scheduler.run() + + def thread(x: Subscriber): + t = threading.Thread(target=schedule, args=(x,)) + t.start() + + Observable.__init__(self, thread) + self._i = 0 + + def _run(self, scheduler, x: Subscriber, callback: Callable): + if x.closed: + x.complete() + return + + scheduler.enter( + self._interval, + 1, + self._run, + (scheduler, x, callback), + ) + callback(x) + + def _default_callback(self, x: Subscriber): + x.next(self._i) + self._i += 1 diff --git a/unittests/unittests_reactive_extenstions/reactive_test_case.py b/unittests/unittests_reactive_extenstions/reactive_test_case.py index d584ed00..a374f0d5 100644 --- a/unittests/unittests_reactive_extenstions/reactive_test_case.py +++ b/unittests/unittests_reactive_extenstions/reactive_test_case.py @@ -1,13 +1,15 @@ import time import traceback import unittest +from datetime import datetime from threading import Timer from cpl_core.console import Console from cpl_reactive_extensions.behavior_subject import BehaviorSubject +from cpl_reactive_extensions.interval import Interval from cpl_reactive_extensions.observable import Observable -from cpl_reactive_extensions.subscriber import Observer from cpl_reactive_extensions.subject import Subject +from cpl_reactive_extensions.subscriber import Observer class ReactiveTestCase(unittest.TestCase): @@ -128,3 +130,21 @@ class ReactiveTestCase(unittest.TestCase): subject.subscribe(lambda x: Console.write_line("b", x)) subject.next(3) + + def test_interval_default(self): + wait = 10 + i = 0 + + def test_sub(x): + nonlocal i + self.assertEqual(x, i) + i += 1 + + observable = Interval(1.0) + sub = observable.subscribe(test_sub) + start = datetime.now() + + time.sleep(wait) + sub.unsubscribe() + end = datetime.now() + self.assertEqual(round((end - start).total_seconds()), wait)