Added subject
This commit is contained in:
@@ -1,18 +1,56 @@
|
||||
from typing import Callable
|
||||
from typing import Callable, Union
|
||||
|
||||
from cpl_reactive_extensions.observer import Observer
|
||||
|
||||
|
||||
class Observable:
|
||||
def __init__(self, callback: Callable):
|
||||
def __init__(self, callback: Callable = None):
|
||||
self._callback = callback
|
||||
self._subscriptions: list[Callable] = []
|
||||
|
||||
def _run_subscriptions(self):
|
||||
for callback in self._subscriptions:
|
||||
callback()
|
||||
self._observers: list[Observer] = []
|
||||
|
||||
def subscribe(self, observer: Observer):
|
||||
@staticmethod
|
||||
def from_list(values: list):
|
||||
i = 0
|
||||
|
||||
def callback(x: Observer):
|
||||
nonlocal i
|
||||
if i == len(values):
|
||||
i = 0
|
||||
x.complete()
|
||||
else:
|
||||
x.next(values[i])
|
||||
i += 1
|
||||
|
||||
if not x.closed:
|
||||
callback(x)
|
||||
|
||||
observable = Observable(callback)
|
||||
return observable
|
||||
|
||||
def subscribe(
|
||||
self, observer_or_next: Union[Callable, Observer], on_error: Callable = None, on_complete: Callable = None
|
||||
) -> Observer:
|
||||
if isinstance(observer_or_next, Callable):
|
||||
observer = Observer(observer_or_next, on_error, on_complete)
|
||||
elif isinstance(observer_or_next, Observable):
|
||||
observer = observer_or_next
|
||||
else:
|
||||
observer = observer_or_next
|
||||
|
||||
if self._callback is None:
|
||||
self._observers.append(observer)
|
||||
return observer
|
||||
|
||||
if len(observer._observers) > 0:
|
||||
for observer in observer._observers:
|
||||
self._call(observer)
|
||||
else:
|
||||
self._call(observer)
|
||||
|
||||
return observer
|
||||
|
||||
def _call(self, observer: Observer):
|
||||
try:
|
||||
self._callback(observer)
|
||||
except Exception as e:
|
||||
|
@@ -6,8 +6,14 @@ from cpl_core.type import T
|
||||
class Observer:
|
||||
def __init__(self, on_next: Callable, on_error: Callable = None, on_complete: Callable = None):
|
||||
self._on_next = on_next
|
||||
self._on_error = on_error if on_error is not None else lambda err: err
|
||||
self._on_complete = on_complete if on_complete is not None else lambda x: x
|
||||
self._on_error = on_error
|
||||
self._on_complete = on_complete
|
||||
|
||||
self._closed = False
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
return self._closed
|
||||
|
||||
def next(self, value: T):
|
||||
self._on_next(value)
|
||||
|
@@ -3,15 +3,18 @@ from cpl_reactive_extensions.observable import Observable
|
||||
|
||||
|
||||
class Subject(Observable):
|
||||
def __init__(self):
|
||||
def __init__(self, _t: type):
|
||||
Observable.__init__(self)
|
||||
|
||||
self._t = _t
|
||||
self._value: T = None
|
||||
|
||||
@property
|
||||
def value(self) -> T:
|
||||
return self._value
|
||||
|
||||
def emit(self, value: T):
|
||||
def next(self, value: T):
|
||||
if not isinstance(value, self._t):
|
||||
raise TypeError(f"Expected {self._t.__name__} not {type(value).__name__}")
|
||||
|
||||
self._value = value
|
||||
self._subscriptions()
|
||||
|
Reference in New Issue
Block a user