Added interval

This commit is contained in:
Sven Heidemann 2023-04-16 00:49:38 +02:00
parent a463ac5274
commit 51803bf5d1
2 changed files with 68 additions and 1 deletions

View File

@ -0,0 +1,47 @@
import sched
import threading
import time
from typing import Callable
from cpl_reactive_extensions.observable import Observable
from cpl_reactive_extensions.subscriber import Subscriber
class Interval(Observable):
def __init__(self, interval: float, callback: Callable = None):
self._interval = interval
callback = callback if callback is not None else self._default_callback
def schedule(x: Subscriber):
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enter(
self._interval,
1,
self._run,
(scheduler, x, callback),
)
scheduler.run()
def thread(x: Subscriber):
t = threading.Thread(target=schedule, args=(x,))
t.start()
Observable.__init__(self, thread)
self._i = 0
def _run(self, scheduler, x: Subscriber, callback: Callable):
if x.closed:
x.complete()
return
scheduler.enter(
self._interval,
1,
self._run,
(scheduler, x, callback),
)
callback(x)
def _default_callback(self, x: Subscriber):
x.next(self._i)
self._i += 1

View File

@ -1,13 +1,15 @@
import time
import traceback
import unittest
from datetime import datetime
from threading import Timer
from cpl_core.console import Console
from cpl_reactive_extensions.behavior_subject import BehaviorSubject
from cpl_reactive_extensions.interval import Interval
from cpl_reactive_extensions.observable import Observable
from cpl_reactive_extensions.subscriber import Observer
from cpl_reactive_extensions.subject import Subject
from cpl_reactive_extensions.subscriber import Observer
class ReactiveTestCase(unittest.TestCase):
@ -128,3 +130,21 @@ class ReactiveTestCase(unittest.TestCase):
subject.subscribe(lambda x: Console.write_line("b", x))
subject.next(3)
def test_interval_default(self):
wait = 10
i = 0
def test_sub(x):
nonlocal i
self.assertEqual(x, i)
i += 1
observable = Interval(1.0)
sub = observable.subscribe(test_sub)
start = datetime.now()
time.sleep(wait)
sub.unsubscribe()
end = datetime.now()
self.assertEqual(round((end - start).total_seconds()), wait)