Added take until
This commit is contained in:
parent
b7d518022a
commit
39ca803d36
@ -1,9 +1,8 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Callable, Any, Optional
|
from typing import Callable, Any, Optional, Type
|
||||||
|
|
||||||
from cpl_core.type import T
|
from cpl_core.type import T
|
||||||
from cpl_reactive_extensions.abc.operator import Operator
|
|
||||||
from cpl_reactive_extensions.abc.subscribable import Subscribable
|
from cpl_reactive_extensions.abc.subscribable import Subscribable
|
||||||
from cpl_reactive_extensions.subscriber import Observer, Subscriber
|
from cpl_reactive_extensions.subscriber import Observer, Subscriber
|
||||||
from cpl_reactive_extensions.subscription import Subscription
|
from cpl_reactive_extensions.subscription import Subscription
|
||||||
@ -19,6 +18,16 @@ class Observable(Subscribable):
|
|||||||
self._source: Optional[Observable] = None
|
self._source: Optional[Observable] = None
|
||||||
self._operator: Optional[Callable] = None
|
self._operator: Optional[Callable] = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_observable(obs: Observable):
|
||||||
|
def inner(subscriber: Subscriber):
|
||||||
|
if "subscribe" not in dir(obs):
|
||||||
|
raise TypeError("Unable to lift unknown Observable type")
|
||||||
|
|
||||||
|
return obs.subscribe(subscriber)
|
||||||
|
|
||||||
|
return Observable(inner)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_list(values: list):
|
def from_list(values: list):
|
||||||
i = 0
|
i = 0
|
||||||
|
14
src/cpl_reactive_extensions/operators/take_until.py
Normal file
14
src/cpl_reactive_extensions/operators/take_until.py
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
from cpl_core.type import T
|
||||||
|
from cpl_reactive_extensions import Subscriber, Observable
|
||||||
|
from cpl_reactive_extensions.operator_subscriber import OperatorSubscriber
|
||||||
|
from cpl_reactive_extensions.utils import operate
|
||||||
|
|
||||||
|
|
||||||
|
def take_until(notifier: Observable):
|
||||||
|
def init(source: Observable, subscriber: Subscriber):
|
||||||
|
Observable.from_observable(notifier).subscribe(OperatorSubscriber(subscriber, lambda: subscriber.complete()))
|
||||||
|
|
||||||
|
if not subscriber.closed:
|
||||||
|
source.subscribe(subscriber)
|
||||||
|
|
||||||
|
return operate(init)
|
@ -1,4 +1,5 @@
|
|||||||
from typing import Any, Optional
|
from types import NoneType
|
||||||
|
from typing import Any, Optional, Type
|
||||||
|
|
||||||
from cpl_core.type import T
|
from cpl_core.type import T
|
||||||
from cpl_reactive_extensions.abc.observer import Observer
|
from cpl_reactive_extensions.abc.observer import Observer
|
||||||
@ -8,11 +9,11 @@ from cpl_reactive_extensions.subscription import Subscription
|
|||||||
|
|
||||||
|
|
||||||
class Subject(Observable, Observer):
|
class Subject(Observable, Observer):
|
||||||
def __init__(self, _t: type):
|
def __init__(self, _t: Type[T]):
|
||||||
Observable.__init__(self)
|
Observable.__init__(self)
|
||||||
|
|
||||||
self.is_closed = False
|
self.is_closed = False
|
||||||
self._t = _t
|
self._t = _t if _t is not None else NoneType
|
||||||
self._current_observers: Optional[list[Observer]] = None
|
self._current_observers: Optional[list[Observer]] = None
|
||||||
|
|
||||||
self.closed = False
|
self.closed = False
|
||||||
|
@ -1,9 +1,12 @@
|
|||||||
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from cpl_core.console import Console
|
from cpl_core.console import Console
|
||||||
|
from cpl_reactive_extensions import Subject
|
||||||
from cpl_reactive_extensions.interval import Interval
|
from cpl_reactive_extensions.interval import Interval
|
||||||
from cpl_reactive_extensions.operators.take import take
|
from cpl_reactive_extensions.operators.take import take
|
||||||
|
from cpl_reactive_extensions.operators.take_until import take_until
|
||||||
|
|
||||||
|
|
||||||
class ObservableOperatorTestCase(unittest.TestCase):
|
class ObservableOperatorTestCase(unittest.TestCase):
|
||||||
@ -20,8 +23,45 @@ class ObservableOperatorTestCase(unittest.TestCase):
|
|||||||
self._completed = True
|
self._completed = True
|
||||||
|
|
||||||
def test_take_two(self):
|
def test_take_two(self):
|
||||||
|
count = 0
|
||||||
|
|
||||||
def sub(x):
|
def sub(x):
|
||||||
Console.write_line(x)
|
nonlocal count
|
||||||
|
|
||||||
|
count += 1
|
||||||
|
|
||||||
observable = Interval(0.1)
|
observable = Interval(0.1)
|
||||||
sub = observable.pipe(take(2)).subscribe(sub)
|
observable.pipe(take(2)).subscribe(sub)
|
||||||
|
time.sleep(0.5)
|
||||||
|
self.assertEqual(count, 2)
|
||||||
|
|
||||||
|
def test_take_five(self):
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
def sub(x):
|
||||||
|
nonlocal count
|
||||||
|
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
observable = Interval(0.1)
|
||||||
|
observable.pipe(take(5)).subscribe(sub)
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertEqual(count, 5)
|
||||||
|
|
||||||
|
def test_take_until(self):
|
||||||
|
count = 0
|
||||||
|
unsubscriber = Subject(None)
|
||||||
|
|
||||||
|
def sub(x):
|
||||||
|
nonlocal count
|
||||||
|
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
observable = Interval(0.1)
|
||||||
|
observable.pipe(take_until(unsubscriber)).subscribe(sub)
|
||||||
|
|
||||||
|
timer = 2
|
||||||
|
time.sleep(timer)
|
||||||
|
unsubscriber.next(None)
|
||||||
|
unsubscriber.complete()
|
||||||
|
self.assertEqual(count, timer * 10 - 1)
|
||||||
|
Loading…
Reference in New Issue
Block a user