diff --git a/src/cpl_reactive_extensions/interval.py b/src/cpl_reactive_extensions/interval.py index c33b0acc..9fe9163a 100644 --- a/src/cpl_reactive_extensions/interval.py +++ b/src/cpl_reactive_extensions/interval.py @@ -8,7 +8,7 @@ from cpl_reactive_extensions.subscriber import Subscriber class Interval(Observable): - def __init__(self, interval: float, callback: Callable = None): + def __init__(self, interval: float, callback: Callable = None, not_in_background=False): self._interval = interval callback = callback if callback is not None else self._default_callback @@ -26,7 +26,7 @@ class Interval(Observable): t = threading.Thread(target=schedule, args=(x,)) t.start() - Observable.__init__(self, thread) + Observable.__init__(self, schedule if not_in_background else thread) self._i = 0 def _run(self, scheduler, x: Subscriber, callback: Callable): diff --git a/src/cpl_reactive_extensions/observable.py b/src/cpl_reactive_extensions/observable.py index 0bab5258..94dabc29 100644 --- a/src/cpl_reactive_extensions/observable.py +++ b/src/cpl_reactive_extensions/observable.py @@ -17,7 +17,7 @@ class Observable(Subscribable): self._subscribe = subscribe self._source: Optional[Observable] = None - self._operator: Optional[Operator] = None + self._operator: Optional[Callable] = None @staticmethod def from_list(values: list): @@ -38,7 +38,7 @@ class Observable(Subscribable): observable = Observable(callback) return observable - def lift(self, operator: Operator) -> Observable: + def lift(self, operator: Callable) -> Observable: observable = Observable() observable._source = self observable._operator = operator @@ -75,7 +75,7 @@ class Observable(Subscribable): ) subscriber.add( - self._operator.call(subscriber, self._source) + self._operator(subscriber, self._source) if self._operator is not None else self._subscribe(subscriber) if self._source is not None @@ -84,18 +84,8 @@ class Observable(Subscribable): return subscriber - def _call(self, observer: Observer): - try: - self._subscribe(observer) - except Exception as e: - observer.error(e) - def pipe(self, *args) -> Observable: - # observables = [] - # for arg in args: - # observable = arg(self) - # observables.append(observable) - return self._pipe_from_array(args) + return self._pipe_from_array(args)(self) def _pipe_from_array(self, args): if len(args) == 0: diff --git a/src/cpl_reactive_extensions/operator_subscriber.py b/src/cpl_reactive_extensions/operator_subscriber.py index 28e3998e..f93ceac6 100644 --- a/src/cpl_reactive_extensions/operator_subscriber.py +++ b/src/cpl_reactive_extensions/operator_subscriber.py @@ -2,9 +2,10 @@ from typing import Callable from cpl_core.type import T from cpl_reactive_extensions import Subscriber +from cpl_reactive_extensions.abc import Observer -class OperatorSubscriber(Subscriber): +class OperatorSubscriber(Subscriber, Observer): def __init__( self, destination: Subscriber, @@ -14,19 +15,19 @@ class OperatorSubscriber(Subscriber): on_finalize: Callable = None, should_unsubscribe: Callable = None, ): - Subscriber.__init__(self) + Subscriber.__init__(self, destination) self._on_finalize = on_finalize self._should_unsubscribe = should_unsubscribe - def on_next_wrapper(self: OperatorSubscriber, value: T): + def on_next_wrapper(value: T): try: on_next(value) except Exception as e: destination.error(e) - self._on_next = on_next_wrapper if on_next is not None else super()._on_next + self._on_next = on_next_wrapper if on_next is not None else self._on_next - def on_error_wrapper(self: OperatorSubscriber, value: T): + def on_error_wrapper(value: T): try: on_error(value) except Exception as e: @@ -34,9 +35,9 @@ class OperatorSubscriber(Subscriber): finally: self.unsubscribe() - self._on_error = on_error_wrapper if on_error is not None else super()._on_error + self._on_error = on_error_wrapper if on_error is not None else self._on_error - def on_complete_wrapper(self: OperatorSubscriber, value: T): + def on_complete_wrapper(value: T): try: on_complete(value) except Exception as e: @@ -44,10 +45,11 @@ class OperatorSubscriber(Subscriber): finally: self.unsubscribe() - self._on_complete = on_complete_wrapper if on_complete is not None else super()._on_complete + self._on_complete = on_complete_wrapper if on_complete is not None else self._on_complete def unsubscribe(self): - if self._should_unsubscribe and not self._should_unsubscribe(): + if self._should_unsubscribe is not None and not self._should_unsubscribe(): return - super().unsubscribe() - not self.closed and self._on_finalize is not None and self._on_finalize() + Subscriber.unsubscribe(self) + if not self.closed and self._on_finalize is not None: + self._on_finalize() diff --git a/src/cpl_reactive_extensions/operators/take.py b/src/cpl_reactive_extensions/operators/take.py index b9c64af0..ec93f8b0 100644 --- a/src/cpl_reactive_extensions/operators/take.py +++ b/src/cpl_reactive_extensions/operators/take.py @@ -11,7 +11,7 @@ def take(count: int): def init(source: Observable, subscriber: Subscriber): seen = 0 - def sub(value: T): + def on_next(value: T): nonlocal seen if seen + 1 <= count: @@ -20,7 +20,9 @@ def take(count: int): if count <= seen: subscriber.complete() + else: + sub.unsubscribe() - source.subscribe(OperatorSubscriber(subscriber, sub)) + sub = source.subscribe(OperatorSubscriber(subscriber, on_next)) return operate(init) diff --git a/src/cpl_reactive_extensions/subscriber.py b/src/cpl_reactive_extensions/subscriber.py index 5c4959ba..d4ece2f4 100644 --- a/src/cpl_reactive_extensions/subscriber.py +++ b/src/cpl_reactive_extensions/subscriber.py @@ -21,29 +21,45 @@ class Subscriber(Subscription, Observer): self._on_error = on_error self._on_complete = on_complete + def _next(self, value: T): + self._on_next(value) + def next(self, value: T): if self.is_stopped: raise Exception("Observer is closed") - self._on_next(value) + self._next(value) + + def _error(self, ex: Exception): + try: + self._on_error(ex) + finally: + self.unsubscribe() def error(self, ex: Exception): if self.is_stopped: return - self._on_error(ex) + self._error(ex) + + def _complete(self): + try: + self._on_complete() + finally: + self.unsubscribe() def complete(self): if self.is_stopped: return self.is_stopped = True - self._on_complete() + self._complete() def unsubscribe(self): if self._closed: return - super().unsubscribe() + self.is_stopped = True + Subscription.unsubscribe(self) self._on_next = None self._on_error = None self._on_complete = None diff --git a/src/cpl_reactive_extensions/subscription.py b/src/cpl_reactive_extensions/subscription.py index 70c5afa2..2b8644e0 100644 --- a/src/cpl_reactive_extensions/subscription.py +++ b/src/cpl_reactive_extensions/subscription.py @@ -62,14 +62,14 @@ class Subscription(Unsubscribable): except Exception as e: print(e) - for finalizer in self._finalizers: + finalizers = self._finalizers + self._finalizers = None + for finalizer in finalizers: try: self._exec_finalizer(finalizer) except Exception as e: print(e) - self._finalizers = None - def add(self, tear_down: Union[Subscription, Unsubscribable]): if tear_down is None or tear_down == self: return diff --git a/src/cpl_reactive_extensions/utils.py b/src/cpl_reactive_extensions/utils.py index 1fd7da18..1be7dc42 100644 --- a/src/cpl_reactive_extensions/utils.py +++ b/src/cpl_reactive_extensions/utils.py @@ -1,10 +1,9 @@ from typing import Callable from cpl_reactive_extensions import Observable, Subscriber -from cpl_reactive_extensions.abc import Operator -def operate(init: Callable[[Observable, Subscriber], Operator]): +def operate(init: Callable[[Observable, Subscriber], None]): def observable(source: Observable): def create(self: Subscriber, lifted_source: Observable): try: @@ -12,12 +11,9 @@ def operate(init: Callable[[Observable, Subscriber], Operator]): except Exception as e: self.error(e) - operator = Operator() - operator.call = create - if "lift" not in dir(source): raise TypeError("Unable to lift unknown Observable type") - return source.lift(operator) + return source.lift(create) return observable diff --git a/unittests/unittests_reactive_extenstions/observable_operator.py b/unittests/unittests_reactive_extenstions/observable_operator.py index f31b539a..ce451735 100644 --- a/unittests/unittests_reactive_extenstions/observable_operator.py +++ b/unittests/unittests_reactive_extenstions/observable_operator.py @@ -23,5 +23,5 @@ class ObservableOperatorTestCase(unittest.TestCase): def sub(x): Console.write_line(x) - observable = Interval(1.0) + observable = Interval(0.1) sub = observable.pipe(take(2)).subscribe(sub)