WIP: #170 #172
@ -33,17 +33,19 @@ class Subscriber(Subscription, Observer):
|
||||
def _error(self, ex: Exception):
|
||||
try:
|
||||
self._on_error(ex)
|
||||
except TypeError:
|
||||
pass
|
||||
finally:
|
||||
self.unsubscribe()
|
||||
|
||||
def error(self, ex: Exception):
|
||||
if self.is_stopped:
|
||||
return
|
||||
self._error(ex)
|
||||
|
||||
def _complete(self):
|
||||
try:
|
||||
self._on_complete()
|
||||
except TypeError:
|
||||
pass
|
||||
finally:
|
||||
self.unsubscribe()
|
||||
|
||||
@ -61,5 +63,3 @@ class Subscriber(Subscription, Observer):
|
||||
self.is_stopped = True
|
||||
Subscription.unsubscribe(self)
|
||||
self._on_next = None
|
||||
self._on_error = None
|
||||
self._on_complete = None
|
||||
|
@ -1,7 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import traceback
|
||||
from typing import Union, Callable, Optional
|
||||
|
||||
from cpl_core.console import Console
|
||||
from cpl_reactive_extensions.abc.unsubscribable import Unsubscribable
|
||||
|
||||
|
||||
@ -60,7 +62,7 @@ class Subscription(Unsubscribable):
|
||||
try:
|
||||
self._initial_teardown()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
Console.error(e, traceback.format_exc())
|
||||
|
||||
finalizers = self._finalizers
|
||||
self._finalizers = None
|
||||
@ -68,7 +70,7 @@ class Subscription(Unsubscribable):
|
||||
try:
|
||||
self._exec_finalizer(finalizer)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
Console.error(e, traceback.format_exc())
|
||||
|
||||
def add(self, tear_down: Union[Subscription, Unsubscribable]):
|
||||
if tear_down is None or tear_down == self:
|
||||
@ -87,6 +89,7 @@ class Subscription(Unsubscribable):
|
||||
self._finalizers.append(tear_down)
|
||||
|
||||
def remove(self, tear_down: Union[Subscription, Unsubscribable]):
|
||||
if self._finalizers is not None:
|
||||
self._finalizers.remove(tear_down)
|
||||
|
||||
if isinstance(tear_down, Subscription):
|
||||
|
@ -72,22 +72,16 @@ class ReactiveTestCase(unittest.TestCase):
|
||||
Timer(1.0, complete).start()
|
||||
|
||||
def test_observer_completed(self):
|
||||
reached = False
|
||||
|
||||
def _test_complete(x: Observer):
|
||||
nonlocal reached
|
||||
|
||||
x.next(1)
|
||||
x.next(2)
|
||||
x.complete()
|
||||
reached = True
|
||||
x.next(3)
|
||||
|
||||
observable = Observable(_test_complete)
|
||||
|
||||
observable.subscribe(lambda x: Console.write_line(1, x), self._on_error)
|
||||
self.assertFalse(reached)
|
||||
self.assertFalse(self._error)
|
||||
self.assertTrue(self._error)
|
||||
|
||||
def test_observable_from(self):
|
||||
expected_x = 1
|
||||
|
Loading…
Reference in New Issue
Block a user