2023-04-15 19:14:36 +02:00
|
|
|
import time
|
2023-04-15 18:59:24 +02:00
|
|
|
import traceback
|
2023-04-15 16:17:31 +02:00
|
|
|
import unittest
|
2023-04-16 00:49:38 +02:00
|
|
|
from datetime import datetime
|
2023-04-15 16:17:31 +02:00
|
|
|
from threading import Timer
|
|
|
|
|
2023-04-15 18:59:24 +02:00
|
|
|
from cpl_core.console import Console
|
2023-04-16 21:46:17 +02:00
|
|
|
from cpl_reactive_extensions.subject.behavior_subject import BehaviorSubject
|
2023-04-16 00:49:38 +02:00
|
|
|
from cpl_reactive_extensions.interval import Interval
|
2023-04-15 16:17:31 +02:00
|
|
|
from cpl_reactive_extensions.observable import Observable
|
2023-04-16 21:46:17 +02:00
|
|
|
from cpl_reactive_extensions.subject.subject import Subject
|
|
|
|
from cpl_reactive_extensions.internal.subscriber import Observer
|
2023-04-15 16:17:31 +02:00
|
|
|
|
|
|
|
|
|
|
|
class ReactiveTestCase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
2023-04-15 18:59:24 +02:00
|
|
|
self._error = False
|
|
|
|
self._completed = False
|
|
|
|
|
|
|
|
def _on_error(self, ex: Exception):
|
|
|
|
tb = traceback.format_exc()
|
2023-04-15 19:14:36 +02:00
|
|
|
Console.error(f"Got error from observable: {ex}", tb)
|
2023-04-15 18:59:24 +02:00
|
|
|
self._error = True
|
|
|
|
|
|
|
|
def _on_complete(self):
|
|
|
|
self._completed = True
|
2023-04-15 16:17:31 +02:00
|
|
|
|
|
|
|
def test_observer(self):
|
|
|
|
called = 0
|
|
|
|
test_x = 1
|
|
|
|
|
|
|
|
def callback(observer: Observer):
|
|
|
|
nonlocal test_x
|
|
|
|
observer.next(test_x)
|
|
|
|
test_x += 1
|
|
|
|
observer.next(test_x)
|
|
|
|
test_x += 1
|
|
|
|
observer.next(test_x)
|
|
|
|
|
|
|
|
def complete():
|
|
|
|
nonlocal test_x
|
|
|
|
test_x += 1
|
|
|
|
observer.next(test_x)
|
|
|
|
observer.complete()
|
|
|
|
|
|
|
|
Timer(1.0, complete).start()
|
|
|
|
|
|
|
|
observable = Observable(callback)
|
|
|
|
|
|
|
|
def on_next(x):
|
|
|
|
nonlocal called
|
|
|
|
called += 1
|
|
|
|
self.assertEqual(test_x, x)
|
|
|
|
|
|
|
|
self.assertEqual(called, 0)
|
2023-04-15 18:59:24 +02:00
|
|
|
self.assertFalse(self._error)
|
|
|
|
self.assertFalse(self._completed)
|
2023-04-15 16:17:31 +02:00
|
|
|
observable.subscribe(
|
2023-04-15 18:59:24 +02:00
|
|
|
on_next,
|
|
|
|
self._on_error,
|
|
|
|
self._on_complete,
|
2023-04-15 16:17:31 +02:00
|
|
|
)
|
|
|
|
self.assertEqual(called, 3)
|
2023-04-15 18:59:24 +02:00
|
|
|
self.assertFalse(self._error)
|
|
|
|
self.assertFalse(self._completed)
|
2023-04-15 16:17:31 +02:00
|
|
|
|
|
|
|
def complete():
|
|
|
|
self.assertEqual(called, 4)
|
2023-04-15 18:59:24 +02:00
|
|
|
self.assertFalse(self._error)
|
|
|
|
self.assertTrue(self._completed)
|
2023-04-15 16:17:31 +02:00
|
|
|
|
|
|
|
Timer(1.0, complete).start()
|
|
|
|
|
2023-04-16 15:58:46 +02:00
|
|
|
def test_observer_completed(self):
|
2023-04-15 19:14:36 +02:00
|
|
|
def _test_complete(x: Observer):
|
|
|
|
x.next(1)
|
|
|
|
x.next(2)
|
|
|
|
x.complete()
|
|
|
|
x.next(3)
|
|
|
|
|
2023-04-16 15:58:46 +02:00
|
|
|
observable = Observable(_test_complete)
|
2023-04-15 19:14:36 +02:00
|
|
|
|
2023-04-16 15:58:46 +02:00
|
|
|
observable.subscribe(lambda x: Console.write_line(1, x), self._on_error)
|
2023-04-16 16:15:55 +02:00
|
|
|
self.assertTrue(self._error)
|
2023-04-15 19:14:36 +02:00
|
|
|
|
2023-04-15 18:59:24 +02:00
|
|
|
def test_observable_from(self):
|
|
|
|
expected_x = 1
|
|
|
|
|
|
|
|
def _next(x):
|
|
|
|
nonlocal expected_x
|
|
|
|
self.assertEqual(expected_x, x)
|
|
|
|
expected_x += 1
|
|
|
|
|
|
|
|
observable = Observable.from_list([1, 2, 3, 4])
|
2023-04-16 22:01:15 +02:00
|
|
|
sub = observable.subscribe(
|
2023-04-15 18:59:24 +02:00
|
|
|
_next,
|
|
|
|
self._on_error,
|
|
|
|
)
|
|
|
|
self.assertFalse(self._error)
|
2023-04-16 22:01:15 +02:00
|
|
|
sub.unsubscribe()
|
2023-04-15 18:59:24 +02:00
|
|
|
|
2023-04-15 16:17:31 +02:00
|
|
|
def test_subject(self):
|
2023-04-15 18:59:24 +02:00
|
|
|
expected_x = 1
|
|
|
|
|
2023-04-15 21:41:49 +02:00
|
|
|
def _next(calc, x):
|
2023-04-15 18:59:24 +02:00
|
|
|
nonlocal expected_x
|
|
|
|
self.assertEqual(expected_x, x)
|
2023-04-15 21:41:49 +02:00
|
|
|
if not calc:
|
|
|
|
return
|
2023-04-15 18:59:24 +02:00
|
|
|
expected_x += 1
|
|
|
|
if expected_x == 4:
|
|
|
|
expected_x = 1
|
|
|
|
|
|
|
|
subject = Subject(int)
|
2023-04-15 21:41:49 +02:00
|
|
|
subject.subscribe(lambda x: _next(False, x), self._on_error)
|
|
|
|
subject.subscribe(lambda x: _next(True, x), self._on_error)
|
2023-04-15 18:59:24 +02:00
|
|
|
|
|
|
|
observable = Observable.from_list([1, 2, 3])
|
2023-04-16 22:01:15 +02:00
|
|
|
sub = observable.subscribe(subject, self._on_error)
|
2023-04-15 18:59:24 +02:00
|
|
|
|
|
|
|
self.assertFalse(self._error)
|
2023-04-16 22:01:15 +02:00
|
|
|
sub.unsubscribe()
|
2023-04-15 19:21:11 +02:00
|
|
|
|
|
|
|
def test_behavior_subject(self):
|
|
|
|
subject = BehaviorSubject(int, 0)
|
|
|
|
|
|
|
|
subject.subscribe(lambda x: Console.write_line("a", x))
|
|
|
|
|
|
|
|
subject.next(1)
|
|
|
|
subject.next(2)
|
|
|
|
|
|
|
|
subject.subscribe(lambda x: Console.write_line("b", x))
|
|
|
|
subject.next(3)
|
2023-04-16 22:01:15 +02:00
|
|
|
subject.unsubscribe()
|
2023-04-16 00:49:38 +02:00
|
|
|
|
|
|
|
def test_interval_default(self):
|
|
|
|
wait = 10
|
|
|
|
i = 0
|
|
|
|
|
|
|
|
def test_sub(x):
|
|
|
|
nonlocal i
|
|
|
|
self.assertEqual(x, i)
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
observable = Interval(1.0)
|
|
|
|
sub = observable.subscribe(test_sub)
|
|
|
|
start = datetime.now()
|
|
|
|
|
|
|
|
time.sleep(wait)
|
|
|
|
sub.unsubscribe()
|
|
|
|
end = datetime.now()
|
|
|
|
self.assertEqual(round((end - start).total_seconds()), wait)
|
2023-04-16 00:57:35 +02:00
|
|
|
|
|
|
|
def test_interval_custom(self):
|
|
|
|
wait = 10
|
|
|
|
i = 0
|
|
|
|
n = 0
|
|
|
|
|
|
|
|
def callback(x: Observer):
|
|
|
|
nonlocal n
|
|
|
|
x.next(n)
|
|
|
|
n += 1
|
|
|
|
|
|
|
|
def test_sub(x):
|
|
|
|
nonlocal i
|
|
|
|
self.assertEqual(x, i)
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
observable = Interval(1.0, callback)
|
|
|
|
sub = observable.subscribe(test_sub)
|
|
|
|
start = datetime.now()
|
|
|
|
|
|
|
|
time.sleep(wait)
|
|
|
|
sub.unsubscribe()
|
|
|
|
end = datetime.now()
|
|
|
|
self.assertEqual(round((end - start).total_seconds()), wait)
|