diff --git a/src/cpl_reactive_extensions/abc/operator.py b/src/cpl_reactive_extensions/abc/operator.py new file mode 100644 index 00000000..f82425b4 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/operator.py @@ -0,0 +1,12 @@ +from abc import ABC +from typing import Any + +from cpl_reactive_extensions.subscriber import Subscriber + + +class Operator(ABC): + def __init__(self): + ABC.__init__(self) + + def call(self, subscriber: Subscriber, source: Any): + pass diff --git a/src/cpl_reactive_extensions/observable.py b/src/cpl_reactive_extensions/observable.py index b49447d5..609e20ff 100644 --- a/src/cpl_reactive_extensions/observable.py +++ b/src/cpl_reactive_extensions/observable.py @@ -1,16 +1,19 @@ -from typing import Callable, Union, Optional +from typing import Callable, Union, Any, Optional +from cpl_reactive_extensions.abc.operator import Operator from cpl_reactive_extensions.abc.subscribable import Subscribable from cpl_reactive_extensions.subscriber import Observer, Subscriber from cpl_reactive_extensions.subscription import Subscription class Observable(Subscribable): - def __init__(self, callback: Callable = None): + def __init__(self, subscribe: Callable = None): Subscribable.__init__(self) - self._callback = callback + if subscribe is not None: + self._subscribe = subscribe - self._subscribers: list[Observer] = [] + self._source: Optional[Observable] = None + self._operator: Optional[Operator] = None @staticmethod def from_list(values: list): @@ -31,32 +34,48 @@ class Observable(Subscribable): observable = Observable(callback) return observable + @staticmethod + def _is_observer(value: Any) -> bool: + return isinstance(value, Observer) + + @staticmethod + def _is_subscription(value: Any) -> bool: + return isinstance(value, Subscription) + + @staticmethod + def _is_subscriber(value: Any) -> bool: + return isinstance(value, Subscriber) or Observable._is_observer(value) and Observable._is_subscription(value) + + def _subscribe(self, subscriber: Subscriber) -> Subscription: + return self._source.subscribe(subscriber) + def subscribe( self, observer_or_next: Union[Callable, Observer], on_error: Callable = None, on_complete: Callable = None ) -> Subscription: - observable: Optional[Observable] = None - if isinstance(observer_or_next, Observable): - observable = observer_or_next + subscriber = ( + observer_or_next + if Observable._is_subscriber(observer_or_next) + else Subscriber(observer_or_next, on_error, on_complete) + ) - if isinstance(observer_or_next, Callable): - subscriber = Subscriber(observer_or_next, on_error, on_complete) - else: - subscriber = observer_or_next - - if self._callback is None: - self._subscribers.append(subscriber) - return subscriber - - if observable is not None and len(observable._subscribers) > 0: - for subscriber in observable._subscribers: - self._call(subscriber) - else: - self._call(subscriber) + subscriber.add( + self._operator.call(subscriber, self._source) + if self._operator is not None + else self._subscribe(subscriber) + if self._source is not None + else self._try_subscribe(subscriber) + ) return subscriber + def _try_subscribe(self, subscriber: Subscriber): + try: + return self._subscribe(subscriber) + except Exception as e: + subscriber.error(e) + def _call(self, observer: Observer): try: - self._callback(observer) + self._subscribe(observer) except Exception as e: observer.error(e)