diff --git a/src/cpl_reactive_extensions/abc/__init__.py b/src/cpl_reactive_extensions/abc/__init__.py new file mode 100644 index 00000000..425ab6c1 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/__init__.py @@ -0,0 +1 @@ +# imports diff --git a/src/cpl_reactive_extensions/abc/observer.py b/src/cpl_reactive_extensions/abc/observer.py new file mode 100644 index 00000000..e44f4174 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/observer.py @@ -0,0 +1,20 @@ +from abc import abstractmethod, ABC + +from cpl_core.type import T + + +class Observer(ABC): + def __init__(self): + ABC.__init__(self) + + @abstractmethod + def next(self, value: T): + pass + + @abstractmethod + def error(self, ex: Exception): + pass + + @abstractmethod + def complete(self): + pass diff --git a/src/cpl_reactive_extensions/abc/subscribable.py b/src/cpl_reactive_extensions/abc/subscribable.py new file mode 100644 index 00000000..5b5bed79 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/subscribable.py @@ -0,0 +1,16 @@ +from abc import ABC, abstractmethod +from typing import Union, Callable + +from cpl_reactive_extensions.abc.observer import Observer +from cpl_reactive_extensions.abc.unsubscribable import Unsubscribable + + +class Subscribable(ABC): + def __init__(self): + ABC.__init__(self) + + @abstractmethod + def subscribe( + self, observer_or_next: Union[Callable, Observer], on_error: Callable = None, on_complete: Callable = None + ) -> Unsubscribable: + pass diff --git a/src/cpl_reactive_extensions/abc/unsubscribable.py b/src/cpl_reactive_extensions/abc/unsubscribable.py new file mode 100644 index 00000000..2cb0936c --- /dev/null +++ b/src/cpl_reactive_extensions/abc/unsubscribable.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod + + +class Unsubscribable(ABC): + def __init__(self): + ABC.__init__(self) + + @abstractmethod + def unsubscribe(self): + pass diff --git a/src/cpl_reactive_extensions/observable.py b/src/cpl_reactive_extensions/observable.py index 809c1ad7..b49447d5 100644 --- a/src/cpl_reactive_extensions/observable.py +++ b/src/cpl_reactive_extensions/observable.py @@ -1,19 +1,22 @@ from typing import Callable, Union, Optional -from cpl_reactive_extensions.observer import Observer +from cpl_reactive_extensions.abc.subscribable import Subscribable +from cpl_reactive_extensions.subscriber import Observer, Subscriber +from cpl_reactive_extensions.subscription import Subscription -class Observable: +class Observable(Subscribable): def __init__(self, callback: Callable = None): + Subscribable.__init__(self) self._callback = callback - self._observers: list[Observer] = [] + self._subscribers: list[Observer] = [] @staticmethod def from_list(values: list): i = 0 - def callback(x: Observer): + def callback(x: Subscriber): nonlocal i if i == len(values): i = 0 @@ -30,27 +33,27 @@ class Observable: def subscribe( self, observer_or_next: Union[Callable, Observer], on_error: Callable = None, on_complete: Callable = None - ) -> Observer: + ) -> Subscription: observable: Optional[Observable] = None if isinstance(observer_or_next, Observable): observable = observer_or_next if isinstance(observer_or_next, Callable): - observer = Observer(observer_or_next, on_error, on_complete) + subscriber = Subscriber(observer_or_next, on_error, on_complete) else: - observer = observer_or_next + subscriber = observer_or_next if self._callback is None: - self._observers.append(observer) - return observer + self._subscribers.append(subscriber) + return subscriber - if observable is not None and len(observable._observers) > 0: - for observer in observable._observers: - self._call(observer) + if observable is not None and len(observable._subscribers) > 0: + for subscriber in observable._subscribers: + self._call(subscriber) else: - self._call(observer) + self._call(subscriber) - return observer + return subscriber def _call(self, observer: Observer): try: diff --git a/src/cpl_reactive_extensions/observer.py b/src/cpl_reactive_extensions/subscriber.py similarity index 76% rename from src/cpl_reactive_extensions/observer.py rename to src/cpl_reactive_extensions/subscriber.py index 0c9dee24..41289c84 100644 --- a/src/cpl_reactive_extensions/observer.py +++ b/src/cpl_reactive_extensions/subscriber.py @@ -1,20 +1,17 @@ from typing import Callable from cpl_core.type import T +from cpl_reactive_extensions.abc.observer import Observer +from cpl_reactive_extensions.subscription import Subscription -class Observer: +class Subscriber(Subscription, Observer): def __init__(self, on_next: Callable, on_error: Callable = None, on_complete: Callable = None): + Subscription.__init__(self) self._on_next = on_next self._on_error = on_error self._on_complete = on_complete - self._closed = False - - @property - def closed(self) -> bool: - return self._closed - def next(self, value: T): if self._closed: raise Exception("Observer is closed") diff --git a/src/cpl_reactive_extensions/subscription.py b/src/cpl_reactive_extensions/subscription.py new file mode 100644 index 00000000..12046260 --- /dev/null +++ b/src/cpl_reactive_extensions/subscription.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from typing import Union, Callable, Optional + +from cpl_reactive_extensions.abc.unsubscribable import Unsubscribable + + +class Subscription(Unsubscribable): + def __init__(self, initial_teardown: Optional[Callable] = None): + Unsubscribable.__init__(self) + + self._initial_teardown = initial_teardown + + self._closed = False + self._subscriptions = [] + + self._parentage: list[Subscription] = [] + self._finalizers: list[Subscription] = [] + + @property + def closed(self) -> bool: + return self._closed + + def _add_parent(self, parent: Subscription): + self._parentage.append(parent) + + def _has_parent(self, parent: Subscription) -> bool: + return parent in self._parentage + + def _exec_finalizer(self, finalizer: Union[Callable, Unsubscribable]): + if isinstance(finalizer, Callable): + finalizer() + else: + finalizer.unsubscribe() + + def unsubscribe(self): + if not self._closed: + self._closed = True + + for parent in self._parentage: + parent.remove(self) + + if self._initial_teardown is not None: + try: + self._initial_teardown() + except Exception as e: + print(e) + + for finalizer in self._finalizers: + try: + self._exec_finalizer(finalizer) + except Exception as e: + print(e) + + self._subscriptions.clear() + + def add(self, tear_down: Union[Subscription, Unsubscribable]): + if tear_down is None or tear_down == self: + return + + if self.closed: + self._exec_finalizer(tear_down) + return + + if isinstance(tear_down, Subscription): + if tear_down.closed or tear_down._has_parent(self): + return + + tear_down._add_parent(self) + + self._subscriptions.append(tear_down) + + def remove(self, tear_down: Union[Subscription, Unsubscribable]): + self._subscriptions.remove(tear_down) diff --git a/unittests/unittests_reactive_extenstions/reactive_test_case.py b/unittests/unittests_reactive_extenstions/reactive_test_case.py index 96a28979..68952867 100644 --- a/unittests/unittests_reactive_extenstions/reactive_test_case.py +++ b/unittests/unittests_reactive_extenstions/reactive_test_case.py @@ -6,7 +6,7 @@ from threading import Timer from cpl_core.console import Console from cpl_reactive_extensions.behavior_subject import BehaviorSubject from cpl_reactive_extensions.observable import Observable -from cpl_reactive_extensions.observer import Observer +from cpl_reactive_extensions.subscriber import Observer from cpl_reactive_extensions.subject import Subject