diff --git a/src/cpl_reactive_extensions/abc/observer.py b/src/cpl_reactive_extensions/abc/observer.py index e44f4174..be2b0c70 100644 --- a/src/cpl_reactive_extensions/abc/observer.py +++ b/src/cpl_reactive_extensions/abc/observer.py @@ -1,11 +1,11 @@ -from abc import abstractmethod, ABC +from abc import abstractmethod from cpl_core.type import T -class Observer(ABC): +class Observer: def __init__(self): - ABC.__init__(self) + pass @abstractmethod def next(self, value: T): diff --git a/src/cpl_reactive_extensions/observable.py b/src/cpl_reactive_extensions/observable.py index 609e20ff..57619f0d 100644 --- a/src/cpl_reactive_extensions/observable.py +++ b/src/cpl_reactive_extensions/observable.py @@ -1,9 +1,10 @@ -from typing import Callable, Union, Any, Optional +from typing import Callable, Any, Optional from cpl_reactive_extensions.abc.operator import Operator from cpl_reactive_extensions.abc.subscribable import Subscribable from cpl_reactive_extensions.subscriber import Observer, Subscriber from cpl_reactive_extensions.subscription import Subscription +from cpl_reactive_extensions.type import ObserverOrCallable class Observable(Subscribable): @@ -49,8 +50,14 @@ class Observable(Subscribable): def _subscribe(self, subscriber: Subscriber) -> Subscription: return self._source.subscribe(subscriber) + def _try_subscribe(self, subscriber: Subscriber) -> Subscription: + try: + return self._subscribe(subscriber) + except Exception as e: + subscriber.error(e) + def subscribe( - self, observer_or_next: Union[Callable, Observer], on_error: Callable = None, on_complete: Callable = None + self, observer_or_next: ObserverOrCallable, on_error: Callable = None, on_complete: Callable = None ) -> Subscription: subscriber = ( observer_or_next @@ -68,12 +75,6 @@ class Observable(Subscribable): return subscriber - def _try_subscribe(self, subscriber: Subscriber): - try: - return self._subscribe(subscriber) - except Exception as e: - subscriber.error(e) - def _call(self, observer: Observer): try: self._subscribe(observer) diff --git a/src/cpl_reactive_extensions/subject.py b/src/cpl_reactive_extensions/subject.py index 864ea2b2..0ae3f134 100644 --- a/src/cpl_reactive_extensions/subject.py +++ b/src/cpl_reactive_extensions/subject.py @@ -1,9 +1,100 @@ +from typing import Any, Optional + from cpl_core.type import T +from cpl_reactive_extensions.abc.observer import Observer from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.subscriber import Subscriber +from cpl_reactive_extensions.subscription import Subscription -class Subject(Observable): +class Subject(Observable, Observer): def __init__(self, _t: type): Observable.__init__(self) + self.is_closed = False self._t = _t + self._current_observers: Optional[list[Observer]] = None + + self.closed = False + self.observers: list[Observer] = [] + self.is_stopped = False + self.has_error = False + self.raised_error: Any = None + + @property + def observed(self) -> bool: + return len(self.observers) > 0 + + def _raise_if_closed(self): + if not self.closed: + return + raise Exception("Subject is unsubscribed!") + + def next(self, value: T): + self._raise_if_closed() + + if not isinstance(value, self._t): + raise TypeError() + + if self.is_stopped: + return + + if self._current_observers is None: + self._current_observers = self.observers + + for observer in self._current_observers: + observer.next(value) + + def error(self, error: Exception): + self._raise_if_closed() + if self.is_stopped: + return + + self.is_stopped = True + self.has_error = self.is_stopped + for observer in self.observers: + observer.error(error) + + def complete(self): + self._raise_if_closed() + + if self.is_stopped: + return + + self.is_stopped = True + for observer in self.observers: + observer.complete() + + def unsubscribe(self): + self.is_stopped = True + self.is_closed = True + self._current_observers = None + self.observers = [] + + def _try_subscribe(self, subscriber: Subscriber): + self._raise_if_closed() + return super()._try_subscribe(subscriber) + + def _subscribe(self, subscriber: Subscriber) -> Subscription: + self._raise_if_closed() + self._check_finalized_statuses(subscriber) + return self._inner_subscribe(subscriber) + + def _check_finalized_statuses(self, subscriber: Subscriber): + if self.has_error: + subscriber.error(self.raised_error) + elif self.is_stopped: + subscriber.complete() + + def _inner_subscribe(self, subscriber: Subscriber) -> Optional[Subscription]: + if self.has_error or self.is_stopped: + return Subscription.empty() + + self._current_observers = None + self.observers.append(subscriber) + + def _initial(): + self._current_observers = None + self.observers.remove(subscriber) + + return Subscription(_initial) diff --git a/src/cpl_reactive_extensions/subscriber.py b/src/cpl_reactive_extensions/subscriber.py index 41289c84..ae12d78a 100644 --- a/src/cpl_reactive_extensions/subscriber.py +++ b/src/cpl_reactive_extensions/subscriber.py @@ -3,14 +3,22 @@ from typing import Callable from cpl_core.type import T from cpl_reactive_extensions.abc.observer import Observer from cpl_reactive_extensions.subscription import Subscription +from cpl_reactive_extensions.type import ObserverOrCallable class Subscriber(Subscription, Observer): - def __init__(self, on_next: Callable, on_error: Callable = None, on_complete: Callable = None): + def __init__( + self, on_next_or_observer: ObserverOrCallable, on_error: Callable = None, on_complete: Callable = None + ): Subscription.__init__(self) - self._on_next = on_next - self._on_error = on_error - self._on_complete = on_complete + if isinstance(on_next_or_observer, Observer): + self._on_next = on_next_or_observer.next + self._on_error = on_next_or_observer.error + self._on_complete = on_next_or_observer.complete + else: + self._on_next = on_next_or_observer + self._on_error = on_error + self._on_complete = on_complete def next(self, value: T): if self._closed: @@ -29,3 +37,9 @@ class Subscriber(Subscription, Observer): return self._on_complete() + + def unsubscribe(self): + if self._closed: + return + + super().unsubscribe() diff --git a/src/cpl_reactive_extensions/subscription.py b/src/cpl_reactive_extensions/subscription.py index 12046260..70c5afa2 100644 --- a/src/cpl_reactive_extensions/subscription.py +++ b/src/cpl_reactive_extensions/subscription.py @@ -6,13 +6,18 @@ from cpl_reactive_extensions.abc.unsubscribable import Unsubscribable class Subscription(Unsubscribable): + @staticmethod + def empty(): + empty = Subscription() + empty.closed = True + return empty + def __init__(self, initial_teardown: Optional[Callable] = None): Unsubscribable.__init__(self) self._initial_teardown = initial_teardown self._closed = False - self._subscriptions = [] self._parentage: list[Subscription] = [] self._finalizers: list[Subscription] = [] @@ -21,9 +26,20 @@ class Subscription(Unsubscribable): def closed(self) -> bool: return self._closed + @closed.setter + def closed(self, value: bool): + self._closed = value + def _add_parent(self, parent: Subscription): self._parentage.append(parent) + def _remove_parent(self, parent: Subscription): + if self == parent: + self._parentage.clear() + return + + self._parentage.remove(parent) + def _has_parent(self, parent: Subscription) -> bool: return parent in self._parentage @@ -52,7 +68,7 @@ class Subscription(Unsubscribable): except Exception as e: print(e) - self._subscriptions.clear() + self._finalizers = None def add(self, tear_down: Union[Subscription, Unsubscribable]): if tear_down is None or tear_down == self: @@ -68,7 +84,10 @@ class Subscription(Unsubscribable): tear_down._add_parent(self) - self._subscriptions.append(tear_down) + self._finalizers.append(tear_down) def remove(self, tear_down: Union[Subscription, Unsubscribable]): - self._subscriptions.remove(tear_down) + self._finalizers.remove(tear_down) + + if isinstance(tear_down, Subscription): + tear_down._remove_parent(self) diff --git a/src/cpl_reactive_extensions/type.py b/src/cpl_reactive_extensions/type.py new file mode 100644 index 00000000..4c478ec6 --- /dev/null +++ b/src/cpl_reactive_extensions/type.py @@ -0,0 +1,5 @@ +from typing import Callable, Union + +from cpl_reactive_extensions.abc.observer import Observer + +ObserverOrCallable = Union[Callable, Observer] diff --git a/unittests/unittests_reactive_extenstions/reactive_test_case.py b/unittests/unittests_reactive_extenstions/reactive_test_case.py index 68952867..d584ed00 100644 --- a/unittests/unittests_reactive_extenstions/reactive_test_case.py +++ b/unittests/unittests_reactive_extenstions/reactive_test_case.py @@ -100,16 +100,18 @@ class ReactiveTestCase(unittest.TestCase): def test_subject(self): expected_x = 1 - def _next(x): + def _next(calc, x): nonlocal expected_x self.assertEqual(expected_x, x) + if not calc: + return expected_x += 1 if expected_x == 4: expected_x = 1 subject = Subject(int) - subject.subscribe(_next, self._on_error) - subject.subscribe(_next, self._on_error) + subject.subscribe(lambda x: _next(False, x), self._on_error) + subject.subscribe(lambda x: _next(True, x), self._on_error) observable = Observable.from_list([1, 2, 3]) observable.subscribe(subject, self._on_error)