sh_cpl/src/cpl_reactive_extensions/subscriber.py

46 lines
1.3 KiB
Python
Raw Normal View History

2023-04-15 16:17:31 +02:00
from typing import Callable
from cpl_core.type import T
2023-04-15 19:46:05 +02:00
from cpl_reactive_extensions.abc.observer import Observer
from cpl_reactive_extensions.subscription import Subscription
2023-04-15 21:41:49 +02:00
from cpl_reactive_extensions.type import ObserverOrCallable
2023-04-15 16:17:31 +02:00
2023-04-15 19:46:05 +02:00
class Subscriber(Subscription, Observer):
2023-04-15 21:41:49 +02:00
def __init__(
self, on_next_or_observer: ObserverOrCallable, on_error: Callable = None, on_complete: Callable = None
):
2023-04-15 19:46:05 +02:00
Subscription.__init__(self)
2023-04-15 21:41:49 +02:00
if isinstance(on_next_or_observer, Observer):
self._on_next = on_next_or_observer.next
self._on_error = on_next_or_observer.error
self._on_complete = on_next_or_observer.complete
else:
self._on_next = on_next_or_observer
self._on_error = on_error
self._on_complete = on_complete
2023-04-15 18:59:24 +02:00
2023-04-15 16:17:31 +02:00
def next(self, value: T):
2023-04-15 19:14:36 +02:00
if self._closed:
raise Exception("Observer is closed")
2023-04-15 16:17:31 +02:00
self._on_next(value)
def error(self, ex: Exception):
if self._on_error is None:
return
self._on_error(ex)
def complete(self):
2023-04-15 19:14:36 +02:00
self._closed = True
2023-04-15 16:17:31 +02:00
if self._on_complete is None:
return
self._on_complete()
2023-04-15 21:41:49 +02:00
def unsubscribe(self):
if self._closed:
return
super().unsubscribe()