diff --git a/src/cpl_reactive_extensions/observable.py b/src/cpl_reactive_extensions/observable.py index 94dabc29..77561a5b 100644 --- a/src/cpl_reactive_extensions/observable.py +++ b/src/cpl_reactive_extensions/observable.py @@ -1,9 +1,8 @@ from __future__ import annotations -from typing import Callable, Any, Optional +from typing import Callable, Any, Optional, Type from cpl_core.type import T -from cpl_reactive_extensions.abc.operator import Operator from cpl_reactive_extensions.abc.subscribable import Subscribable from cpl_reactive_extensions.subscriber import Observer, Subscriber from cpl_reactive_extensions.subscription import Subscription @@ -19,6 +18,16 @@ class Observable(Subscribable): self._source: Optional[Observable] = None self._operator: Optional[Callable] = None + @staticmethod + def from_observable(obs: Observable): + def inner(subscriber: Subscriber): + if "subscribe" not in dir(obs): + raise TypeError("Unable to lift unknown Observable type") + + return obs.subscribe(subscriber) + + return Observable(inner) + @staticmethod def from_list(values: list): i = 0 diff --git a/src/cpl_reactive_extensions/operators/take_until.py b/src/cpl_reactive_extensions/operators/take_until.py new file mode 100644 index 00000000..a2a51f4e --- /dev/null +++ b/src/cpl_reactive_extensions/operators/take_until.py @@ -0,0 +1,14 @@ +from cpl_core.type import T +from cpl_reactive_extensions import Subscriber, Observable +from cpl_reactive_extensions.operator_subscriber import OperatorSubscriber +from cpl_reactive_extensions.utils import operate + + +def take_until(notifier: Observable): + def init(source: Observable, subscriber: Subscriber): + Observable.from_observable(notifier).subscribe(OperatorSubscriber(subscriber, lambda: subscriber.complete())) + + if not subscriber.closed: + source.subscribe(subscriber) + + return operate(init) diff --git a/src/cpl_reactive_extensions/subject.py b/src/cpl_reactive_extensions/subject.py index 2b2c40fc..95714d2e 100644 --- a/src/cpl_reactive_extensions/subject.py +++ b/src/cpl_reactive_extensions/subject.py @@ -1,4 +1,5 @@ -from typing import Any, Optional +from types import NoneType +from typing import Any, Optional, Type from cpl_core.type import T from cpl_reactive_extensions.abc.observer import Observer @@ -8,11 +9,11 @@ from cpl_reactive_extensions.subscription import Subscription class Subject(Observable, Observer): - def __init__(self, _t: type): + def __init__(self, _t: Type[T]): Observable.__init__(self) self.is_closed = False - self._t = _t + self._t = _t if _t is not None else NoneType self._current_observers: Optional[list[Observer]] = None self.closed = False diff --git a/unittests/unittests_reactive_extenstions/observable_operator.py b/unittests/unittests_reactive_extenstions/observable_operator.py index ce451735..760cbd04 100644 --- a/unittests/unittests_reactive_extenstions/observable_operator.py +++ b/unittests/unittests_reactive_extenstions/observable_operator.py @@ -1,9 +1,12 @@ +import time import traceback import unittest from cpl_core.console import Console +from cpl_reactive_extensions import Subject from cpl_reactive_extensions.interval import Interval from cpl_reactive_extensions.operators.take import take +from cpl_reactive_extensions.operators.take_until import take_until class ObservableOperatorTestCase(unittest.TestCase): @@ -20,8 +23,45 @@ class ObservableOperatorTestCase(unittest.TestCase): self._completed = True def test_take_two(self): + count = 0 + def sub(x): - Console.write_line(x) + nonlocal count + + count += 1 observable = Interval(0.1) - sub = observable.pipe(take(2)).subscribe(sub) + observable.pipe(take(2)).subscribe(sub) + time.sleep(0.5) + self.assertEqual(count, 2) + + def test_take_five(self): + count = 0 + + def sub(x): + nonlocal count + + count += 1 + + observable = Interval(0.1) + observable.pipe(take(5)).subscribe(sub) + time.sleep(1) + self.assertEqual(count, 5) + + def test_take_until(self): + count = 0 + unsubscriber = Subject(None) + + def sub(x): + nonlocal count + + count += 1 + + observable = Interval(0.1) + observable.pipe(take_until(unsubscriber)).subscribe(sub) + + timer = 2 + time.sleep(timer) + unsubscriber.next(None) + unsubscriber.complete() + self.assertEqual(count, timer * 10 - 1)