diff --git a/cpl-workspace.json b/cpl-workspace.json index e00ea987..f4c97235 100644 --- a/cpl-workspace.json +++ b/cpl-workspace.json @@ -6,6 +6,7 @@ "cpl-core": "src/cpl_core/cpl-core.json", "cpl-discord": "src/cpl_discord/cpl-discord.json", "cpl-query": "src/cpl_query/cpl-query.json", + "cpl-reactive-extensions": "src/cpl_reactive_extensions/cpl-reactive-extensions.json", "cpl-translation": "src/cpl_translation/cpl-translation.json", "set-version": "tools/set_version/set-version.json", "set-pip-urls": "tools/set_pip_urls/set-pip-urls.json", @@ -14,7 +15,8 @@ "unittests_core": "unittests/unittests_core/unittests_core.json", "unittests_query": "unittests/unittests_query/unittests_query.json", "unittests_shared": "unittests/unittests_shared/unittests_shared.json", - "unittests_translation": "unittests/unittests_translation/unittests_translation.json" + "unittests_translation": "unittests/unittests_translation/unittests_translation.json", + "unittests_reactive_extenstions": "unittests/unittests_reactive_extenstions/unittests_reactive_extenstions.json" }, "Scripts": { "hello-world": "echo 'Hello World'", @@ -40,12 +42,13 @@ "test": "cpl run unittests", "pre-build-all": "cpl sv $ARGS; cpl spu $ARGS;", - "build-all": "cpl build-cli; cpl build-core; cpl build-discord; cpl build-query; cpl build-translation; cpl build-set-pip-urls; cpl build-set-version", + "build-all": "cpl build-cli; cpl build-core; cpl build-discord; cpl build-query; cpl build-reactive-extensions; cpl build-translation; cpl build-set-pip-urls; cpl build-set-version", "ba": "cpl build-all $ARGS", "build-cli": "echo 'Build cpl-cli'; cd ./src/cpl_cli; cpl build; cd ../../;", "build-core": "echo 'Build cpl-core'; cd ./src/cpl_core; cpl build; cd ../../;", "build-discord": "echo 'Build cpl-discord'; cd ./src/cpl_discord; cpl build; cd ../../;", "build-query": "echo 'Build cpl-query'; cd ./src/cpl_query; cpl build; cd ../../;", + "build-reactive-extensions": "echo 'Build cpl-reactive-x'; cd ./src/cpl_reactive_extensions; cpl build; cd ../../;", "build-translation": "echo 'Build cpl-translation'; cd ./src/cpl_translation; cpl build; cd ../../;", "build-set-pip-urls": "echo 'Build set-pip-urls'; cd ./tools/set_pip_urls; cpl build; cd ../../;", "build-set-version": "echo 'Build set-version'; cd ./tools/set_version; cpl build; cd ../../;", @@ -57,6 +60,7 @@ "publish-core": "echo 'Publish cpl-core'; cd ./src/cpl_core; cpl publish; cd ../../;", "publish-discord": "echo 'Publish cpl-discord'; cd ./src/cpl_discord; cpl publish; cd ../../;", "publish-query": "echo 'Publish cpl-query'; cd ./src/cpl_query; cpl publish; cd ../../;", + "publish-reactive-extensions": "echo 'Publish cpl-reactive-x'; cd ./src/cpl_reactive_extensions; cpl publish; cd ../../;", "publish-translation": "echo 'Publish cpl-translation'; cd ./src/cpl_translation; cpl publish; cd ../../;", "upload-prod-cli": "echo 'PROD Upload cpl-cli'; cpl upl-prod-cli;", @@ -71,6 +75,9 @@ "upload-prod-query": "echo 'PROD Upload cpl-query'; cpl upl-prod-query;", "upl-prod-query": "twine upload -r pip.sh-edraft.de dist/cpl-query/publish/setup/*", + "upload-prod-reactive-extensions": "echo 'PROD Upload cpl-reactive-extensions'; cpl upl-prod-query;", + "upl-prod-reactive-extensions": "twine upload -r pip.sh-edraft.de dist/cpl-reactive-extensions/publish/setup/*", + "upload-prod-translation": "echo 'PROD Upload cpl-translation'; cpl upl-prod-translation;", "upl-prod-translation": "twine upload -r pip.sh-edraft.de dist/cpl-translation/publish/setup/*", @@ -86,6 +93,9 @@ "upload-exp-query": "echo 'EXP Upload cpl-query'; cpl upl-exp-query;", "upl-exp-query": "twine upload -r pip-exp.sh-edraft.de dist/cpl-query/publish/setup/*", + "upload-exp-reactive-extensions": "echo 'EXP Upload cpl-reactive-extensions'; cpl upl-exp-query;", + "upl-exp-reactive-extensions": "twine upload -r pip-exp.sh-edraft.de dist/cpl-reactive-extensions/publish/setup/*", + "upload-exp-translation": "echo 'EXP Upload cpl-translation'; cpl upl-exp-translation;", "upl-exp-translation": "twine upload -r pip-exp.sh-edraft.de dist/cpl-translation/publish/setup/*", @@ -101,6 +111,9 @@ "upload-dev-query": "echo 'DEV Upload cpl-query'; cpl upl-dev-query;", "upl-dev-query": "twine upload -r pip-dev.sh-edraft.de dist/cpl-query/publish/setup/*", + "upload-dev-reactive-extensions": "echo 'DEV Upload cpl-reactive-extensions'; cpl upl-dev-query;", + "upl-dev-reactive-extensions": "twine upload -r pip-dev.sh-edraft.de dist/cpl-reactive-extensions/publish/setup/*", + "upload-dev-translation": "echo 'DEV Upload cpl-translation'; cpl upl-dev-translation;", "upl-dev-translation": "twine upload -r pip-dev.sh-edraft.de dist/cpl-translation/publish/setup/*", diff --git a/src/cpl_core/__init__.py b/src/cpl_core/__init__.py index 33f82aae..072bdd31 100644 --- a/src/cpl_core/__init__.py +++ b/src/cpl_core/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -23,4 +23,4 @@ from collections import namedtuple # imports: VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/application/__init__.py b/src/cpl_core/application/__init__.py index 3a87cd69..e53006c1 100644 --- a/src/cpl_core/application/__init__.py +++ b/src/cpl_core/application/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.application" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -29,4 +29,4 @@ from .startup_abc import StartupABC from .startup_extension_abc import StartupExtensionABC VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/configuration/__init__.py b/src/cpl_core/configuration/__init__.py index 1c84582e..91bdf545 100644 --- a/src/cpl_core/configuration/__init__.py +++ b/src/cpl_core/configuration/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.configuration" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -35,4 +35,4 @@ from .validator_abc import ValidatorABC from .variable_argument import VariableArgument VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/console/__init__.py b/src/cpl_core/console/__init__.py index 1f85043b..fedef234 100644 --- a/src/cpl_core/console/__init__.py +++ b/src/cpl_core/console/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.console" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -28,4 +28,4 @@ from .foreground_color_enum import ForegroundColorEnum from .spinner_thread import SpinnerThread VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/cpl-core.json b/src/cpl_core/cpl-core.json index eccc2894..5f207a4e 100644 --- a/src/cpl_core/cpl-core.json +++ b/src/cpl_core/cpl-core.json @@ -4,7 +4,7 @@ "Version": { "Major": "2023", "Minor": "4", - "Micro": "0.post1" + "Micro": "dev170" }, "Author": "Sven Heidemann", "AuthorEmail": "sven.heidemann@sh-edraft.de", diff --git a/src/cpl_core/database/__init__.py b/src/cpl_core/database/__init__.py index 2ddbf2ce..e700c92c 100644 --- a/src/cpl_core/database/__init__.py +++ b/src/cpl_core/database/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.database" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -26,4 +26,4 @@ from .database_settings import DatabaseSettings from .table_abc import TableABC VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/database/connection/__init__.py b/src/cpl_core/database/connection/__init__.py index 5354b745..d2f31ae4 100644 --- a/src/cpl_core/database/connection/__init__.py +++ b/src/cpl_core/database/connection/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.database.connection" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -25,4 +25,4 @@ from .database_connection import DatabaseConnection from .database_connection_abc import DatabaseConnectionABC VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/database/context/__init__.py b/src/cpl_core/database/context/__init__.py index 7b72b757..a1e122fd 100644 --- a/src/cpl_core/database/context/__init__.py +++ b/src/cpl_core/database/context/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.database.context" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -25,4 +25,4 @@ from .database_context import DatabaseContext from .database_context_abc import DatabaseContextABC VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/dependency_injection/__init__.py b/src/cpl_core/dependency_injection/__init__.py index b5474ba1..05e6a176 100644 --- a/src/cpl_core/dependency_injection/__init__.py +++ b/src/cpl_core/dependency_injection/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.dependency_injection" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -31,4 +31,4 @@ from .service_provider import ServiceProvider from .service_provider_abc import ServiceProviderABC VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/environment/__init__.py b/src/cpl_core/environment/__init__.py index 2e0a30cb..629f82c1 100644 --- a/src/cpl_core/environment/__init__.py +++ b/src/cpl_core/environment/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.environment" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -26,4 +26,4 @@ from .environment_name_enum import EnvironmentNameEnum from .application_environment import ApplicationEnvironment VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/logging/__init__.py b/src/cpl_core/logging/__init__.py index 5c812d5a..89d50924 100644 --- a/src/cpl_core/logging/__init__.py +++ b/src/cpl_core/logging/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.logging" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -28,4 +28,4 @@ from .logging_settings import LoggingSettings from .logging_settings_name_enum import LoggingSettingsNameEnum VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/mailing/__init__.py b/src/cpl_core/mailing/__init__.py index 2ae416dd..42bfec59 100644 --- a/src/cpl_core/mailing/__init__.py +++ b/src/cpl_core/mailing/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.mailing" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -28,4 +28,4 @@ from .email_client_settings import EMailClientSettings from .email_client_settings_name_enum import EMailClientSettingsNameEnum VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/pipes/__init__.py b/src/cpl_core/pipes/__init__.py index 956b2267..18cf42d0 100644 --- a/src/cpl_core/pipes/__init__.py +++ b/src/cpl_core/pipes/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.pipes" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -26,4 +26,4 @@ from .ip_address_pipe import IPAddressPipe from .pipe_abc import PipeABC VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/time/__init__.py b/src/cpl_core/time/__init__.py index 100defe5..cfa94114 100644 --- a/src/cpl_core/time/__init__.py +++ b/src/cpl_core/time/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.time" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -25,4 +25,4 @@ from .time_format_settings import TimeFormatSettings from .time_format_settings_names_enum import TimeFormatSettingsNamesEnum VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_core/type.py b/src/cpl_core/type.py index b186d5ad..bc2b5aa8 100644 --- a/src/cpl_core/type.py +++ b/src/cpl_core/type.py @@ -1,3 +1,4 @@ -from typing import TypeVar +from typing import TypeVar, Union T = TypeVar("T") +Number = Union[int, float] diff --git a/src/cpl_core/utils/__init__.py b/src/cpl_core/utils/__init__.py index 27e32634..83f2b99d 100644 --- a/src/cpl_core/utils/__init__.py +++ b/src/cpl_core/utils/__init__.py @@ -15,7 +15,7 @@ __title__ = "cpl_core.utils" __author__ = "Sven Heidemann" __license__ = "MIT" __copyright__ = "Copyright (c) 2020 - 2023 sh-edraft.de" -__version__ = "2023.4.0.post1" +__version__ = "2023.4.dev170" from collections import namedtuple @@ -26,4 +26,4 @@ from .string import String from .pip import Pip VersionInfo = namedtuple("VersionInfo", "major minor micro") -version_info = VersionInfo(major="2023", minor="4", micro="0.post1") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/__init__.py b/src/cpl_reactive_extensions/__init__.py new file mode 100644 index 00000000..2b4c06ff --- /dev/null +++ b/src/cpl_reactive_extensions/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +""" +cpl-reactive-extensions CPL Simple ReactiveX implementation +~~~~~~~~~~~~~~~~~~~ + +CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation. + +:copyright: (c) 2023 sh-edraft.de +:license: MIT, see LICENSE for more details. + +""" + +__title__ = "cpl_reactive_extensions" +__author__ = "Sven Heidemann" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2023 sh-edraft.de" +__version__ = "2023.4.dev170" + +from collections import namedtuple + + +# imports + +VersionInfo = namedtuple("VersionInfo", "major minor micro") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/abc/__init__.py b/src/cpl_reactive_extensions/abc/__init__.py new file mode 100644 index 00000000..41ecc2b3 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +""" +cpl-reactive-extensions CPL Simple ReactiveX implementation +~~~~~~~~~~~~~~~~~~~ + +CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation. + +:copyright: (c) 2023 sh-edraft.de +:license: MIT, see LICENSE for more details. + +""" + +__title__ = "cpl_reactive_extensions.abc" +__author__ = "Sven Heidemann" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2023 sh-edraft.de" +__version__ = "2023.4.dev170" + +from collections import namedtuple + + +# imports + +VersionInfo = namedtuple("VersionInfo", "major minor micro") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/abc/observer.py b/src/cpl_reactive_extensions/abc/observer.py new file mode 100644 index 00000000..be2b0c70 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/observer.py @@ -0,0 +1,20 @@ +from abc import abstractmethod + +from cpl_core.type import T + + +class Observer: + def __init__(self): + pass + + @abstractmethod + def next(self, value: T): + pass + + @abstractmethod + def error(self, ex: Exception): + pass + + @abstractmethod + def complete(self): + pass diff --git a/src/cpl_reactive_extensions/abc/operator.py b/src/cpl_reactive_extensions/abc/operator.py new file mode 100644 index 00000000..e2bb15a9 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/operator.py @@ -0,0 +1,8 @@ +from typing import Any + +from cpl_reactive_extensions.internal.subscriber import Subscriber + + +class Operator: + def call(self, subscriber: Subscriber, source: Any): + pass diff --git a/src/cpl_reactive_extensions/abc/scheduler_action.py b/src/cpl_reactive_extensions/abc/scheduler_action.py new file mode 100644 index 00000000..82aed704 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/scheduler_action.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod + +from cpl_core.type import T, Number +from cpl_reactive_extensions.internal.subscription import Subscription + + +class SchedulerAction(ABC): + @abstractmethod + def schedule(self, state: T = None, delay: Number = None) -> Subscription: + pass diff --git a/src/cpl_reactive_extensions/abc/scheduler_like.py b/src/cpl_reactive_extensions/abc/scheduler_like.py new file mode 100644 index 00000000..d3740cd0 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/scheduler_like.py @@ -0,0 +1,12 @@ +from abc import ABC, abstractmethod +from typing import Callable, Optional + +from cpl_core.type import Number, T +from cpl_reactive_extensions.internal.subscription import Subscription +from cpl_reactive_extensions.abc.scheduler_action import SchedulerAction + + +class SchedulerLike(ABC): + @abstractmethod + def schedule(self, work: Callable[[SchedulerAction, Optional[T]], None], delay: Number, state: T) -> Subscription: + pass diff --git a/src/cpl_reactive_extensions/abc/subscribable.py b/src/cpl_reactive_extensions/abc/subscribable.py new file mode 100644 index 00000000..5b5bed79 --- /dev/null +++ b/src/cpl_reactive_extensions/abc/subscribable.py @@ -0,0 +1,16 @@ +from abc import ABC, abstractmethod +from typing import Union, Callable + +from cpl_reactive_extensions.abc.observer import Observer +from cpl_reactive_extensions.abc.unsubscribable import Unsubscribable + + +class Subscribable(ABC): + def __init__(self): + ABC.__init__(self) + + @abstractmethod + def subscribe( + self, observer_or_next: Union[Callable, Observer], on_error: Callable = None, on_complete: Callable = None + ) -> Unsubscribable: + pass diff --git a/src/cpl_reactive_extensions/abc/unsubscribable.py b/src/cpl_reactive_extensions/abc/unsubscribable.py new file mode 100644 index 00000000..2cb0936c --- /dev/null +++ b/src/cpl_reactive_extensions/abc/unsubscribable.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod + + +class Unsubscribable(ABC): + def __init__(self): + ABC.__init__(self) + + @abstractmethod + def unsubscribe(self): + pass diff --git a/src/cpl_reactive_extensions/cpl-reactive-extensions.json b/src/cpl_reactive_extensions/cpl-reactive-extensions.json new file mode 100644 index 00000000..c02adb45 --- /dev/null +++ b/src/cpl_reactive_extensions/cpl-reactive-extensions.json @@ -0,0 +1,44 @@ +{ + "ProjectSettings": { + "Name": "cpl-reactive-extensions", + "Version": { + "Major": "2023", + "Minor": "4", + "Micro": "dev170" + }, + "Author": "Sven Heidemann", + "AuthorEmail": "sven.heidemann@sh-edraft.de", + "Description": "CPL Simple ReactiveX implementation", + "LongDescription": "CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation.", + "URL": "https://www.sh-edraft.de", + "CopyrightDate": "2023", + "CopyrightName": "sh-edraft.de", + "LicenseName": "MIT", + "LicenseDescription": "MIT, see LICENSE for more details.", + "Dependencies": [ + "cpl-core>=2023.4.dev170" + ], + "DevDependencies": [ + "cpl-cli>=2023.4.0" + ], + "PythonVersion": ">=3.10.4", + "PythonPath": {}, + "Classifiers": [] + }, + "BuildSettings": { + "ProjectType": "library", + "SourcePath": "", + "OutputPath": "../../dist", + "Main": "", + "EntryPoint": "", + "IncludePackageData": false, + "Included": [], + "Excluded": [ + "*/__pycache__", + "*/logs", + "*/tests" + ], + "PackageData": {}, + "ProjectReferences": [] + } +} \ No newline at end of file diff --git a/src/cpl_reactive_extensions/helper/__init__.py b/src/cpl_reactive_extensions/helper/__init__.py new file mode 100644 index 00000000..0fa9697f --- /dev/null +++ b/src/cpl_reactive_extensions/helper/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +""" +cpl-reactive-extensions CPL Simple ReactiveX implementation +~~~~~~~~~~~~~~~~~~~ + +CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation. + +:copyright: (c) 2023 sh-edraft.de +:license: MIT, see LICENSE for more details. + +""" + +__title__ = "cpl_reactive_extensions.helper" +__author__ = "Sven Heidemann" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2023 sh-edraft.de" +__version__ = "2023.4.dev170" + +from collections import namedtuple + + +# imports: + +VersionInfo = namedtuple("VersionInfo", "major minor micro") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/helper/bind.py b/src/cpl_reactive_extensions/helper/bind.py new file mode 100644 index 00000000..f617d601 --- /dev/null +++ b/src/cpl_reactive_extensions/helper/bind.py @@ -0,0 +1,11 @@ +def bind(instance, func, *args, as_name=None): + """ + Bind the function *func* to *instance*, with either provided name *as_name* + or the existing name of *func*. The provided *func* should accept the + instance as the first argument, i.e. "self". + """ + if as_name is None: + as_name = func.__name__ + bound_method = func.__get__(instance, instance.__class__) + setattr(instance, as_name, bound_method) + return bound_method diff --git a/src/cpl_reactive_extensions/internal/__init__.py b/src/cpl_reactive_extensions/internal/__init__.py new file mode 100644 index 00000000..7b09b429 --- /dev/null +++ b/src/cpl_reactive_extensions/internal/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +""" +cpl-reactive-extensions CPL Simple ReactiveX implementation +~~~~~~~~~~~~~~~~~~~ + +CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation. + +:copyright: (c) 2023 sh-edraft.de +:license: MIT, see LICENSE for more details. + +""" + +__title__ = "cpl_reactive_extensions.internal" +__author__ = "Sven Heidemann" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2023 sh-edraft.de" +__version__ = "2023.4.dev170" + +from collections import namedtuple + + +# imports: + +VersionInfo = namedtuple("VersionInfo", "major minor micro") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/internal/action.py b/src/cpl_reactive_extensions/internal/action.py new file mode 100644 index 00000000..ce67cc75 --- /dev/null +++ b/src/cpl_reactive_extensions/internal/action.py @@ -0,0 +1,10 @@ +from cpl_core.type import T, Number +from cpl_reactive_extensions.internal.subscription import Subscription + + +class Action(Subscription): + def __init__(self, scheduler, work): + Subscription.__init__(self) + + def schedule(self, state: T = None, delay: Number = 0) -> Subscription: + return self diff --git a/src/cpl_reactive_extensions/internal/async_action.py b/src/cpl_reactive_extensions/internal/async_action.py new file mode 100644 index 00000000..30e89080 --- /dev/null +++ b/src/cpl_reactive_extensions/internal/async_action.py @@ -0,0 +1,103 @@ +from typing import Optional + +from cpl_core.type import T, Number +from cpl_reactive_extensions.internal.action import Action +from cpl_reactive_extensions.internal.subscription import Subscription +from cpl_reactive_extensions.timer import Timer + + +class AsyncAction(Action): + def __init__(self, scheduler, work): + from cpl_reactive_extensions.scheduler.async_scheduler import AsyncScheduler + + Action.__init__(self, scheduler, work) + + self._scheduler: AsyncScheduler = scheduler + self._work = work + + self.timer = None + self.state: Optional[T] = None + self.delay: Number = 0 + self._pending = False + + def schedule(self, state: T = None, delay: Number = 0) -> Subscription: + if self.closed: + return self + + self.state = state + + timer = self.timer + scheduler = self._scheduler + + if timer is not None: + self.timer = self.recycle_async_timer(scheduler, timer, delay) + + self._pending = True + self.delay = delay + self.timer = self.timer if self.timer is not None else self.request_async_timer(scheduler, delay) + + return self + + def request_async_timer(self, scheduler, delay: Number = 0): + from cpl_reactive_extensions.scheduler.async_scheduler import AsyncScheduler + + scheduler: AsyncScheduler = scheduler + return Timer(delay, lambda: scheduler.flush(self)) + + def recycle_async_timer(self, scheduler, timer=None, delay: Number = None): + from cpl_reactive_extensions.scheduler.async_scheduler import AsyncScheduler + + scheduler: AsyncScheduler = scheduler + if delay is None and self.delay == delay and not self._pending: + return timer + + if timer is not None: + timer.clear() + + return None + + def execute(self, state: T, delay: Number): + if self.closed: + return Exception("Executing cancelled action") + + self._pending = False + error = self._execute(state, delay) + if error is not None: + return error + elif not self._pending and self.timer is not None: + self._timer = self.recycle_async_timer(self._scheduler, self.timer, None) + + def _execute(self, state: T, delay: Number): + errored = False + ex = None + try: + self._work(state) + except Exception as e: + errored = True + ex = e + + if errored: + self.unsubscribe() + return ex + + def unsubscribe(self): + if self.closed: + return + + timer = self.timer + scheduler = self._scheduler + actions = self._scheduler.actions + + self._work = None + self.state = None + self._scheduler = None + self._pending = False + self.delay = None + + if self in actions: + actions.remove(self) + + if self.timer is not None: + self.timer = self.recycle_async_timer(scheduler, timer, None) + + Action.unsubscribe(self) diff --git a/src/cpl_reactive_extensions/internal/operator_subscriber.py b/src/cpl_reactive_extensions/internal/operator_subscriber.py new file mode 100644 index 00000000..77ba0cd7 --- /dev/null +++ b/src/cpl_reactive_extensions/internal/operator_subscriber.py @@ -0,0 +1,55 @@ +from typing import Callable + +from cpl_core.type import T +from cpl_reactive_extensions.abc.observer import Observer +from cpl_reactive_extensions.internal.subscriber import Subscriber + + +class OperatorSubscriber(Subscriber, Observer): + def __init__( + self, + destination: Subscriber, + on_next: Callable = None, + on_error: Callable = None, + on_complete: Callable = None, + on_finalize: Callable = None, + should_unsubscribe: Callable = None, + ): + Subscriber.__init__(self, destination) + self._on_finalize = on_finalize + self._should_unsubscribe = should_unsubscribe + + def on_next_wrapper(value: T): + try: + on_next(value) + except Exception as e: + destination.error(e) + + self._on_next = on_next_wrapper if on_next is not None else self._on_next + + def on_error_wrapper(value: T): + try: + on_error(value) + except Exception as e: + destination.error(e) + finally: + self.unsubscribe() + + self._on_error = on_error_wrapper if on_error is not None else self._on_error + + def on_complete_wrapper(value: T): + try: + on_complete(value) + except Exception as e: + destination.error(e) + finally: + self.unsubscribe() + + self._on_complete = on_complete_wrapper if on_complete is not None else self._on_complete + + def unsubscribe(self): + if self._should_unsubscribe is not None and not self._should_unsubscribe(): + return + Subscriber.unsubscribe(self) + if not self.closed and self._on_finalize is not None: + self._on_finalize() diff --git a/src/cpl_reactive_extensions/internal/subscriber.py b/src/cpl_reactive_extensions/internal/subscriber.py new file mode 100644 index 00000000..e2daeba3 --- /dev/null +++ b/src/cpl_reactive_extensions/internal/subscriber.py @@ -0,0 +1,65 @@ +from typing import Callable + +from cpl_core.type import T +from cpl_reactive_extensions.abc.observer import Observer +from cpl_reactive_extensions.internal.subscription import Subscription +from cpl_reactive_extensions.type import ObserverOrCallable + + +class Subscriber(Subscription, Observer): + def __init__( + self, on_next_or_observer: ObserverOrCallable, on_error: Callable = None, on_complete: Callable = None + ): + self.is_stopped = False + Subscription.__init__(self) + if isinstance(on_next_or_observer, Observer): + self._on_next = on_next_or_observer.next + self._on_error = on_next_or_observer.error + self._on_complete = on_next_or_observer.complete + else: + self._on_next = on_next_or_observer + self._on_error = on_error + self._on_complete = on_complete + + def _next(self, value: T): + self._on_next(value) + + def next(self, value: T): + if self.is_stopped: + raise Exception("Observer is closed") + + self._next(value) + + def _error(self, ex: Exception): + try: + self._on_error(ex) + except TypeError: + pass + finally: + self.unsubscribe() + + def error(self, ex: Exception): + self._error(ex) + + def _complete(self): + try: + self._on_complete() + except TypeError: + pass + finally: + self.unsubscribe() + + def complete(self): + if self.is_stopped: + return + + self.is_stopped = True + self._complete() + + def unsubscribe(self): + if self._closed: + return + + self.is_stopped = True + Subscription.unsubscribe(self) + self._on_next = None diff --git a/src/cpl_reactive_extensions/internal/subscription.py b/src/cpl_reactive_extensions/internal/subscription.py new file mode 100644 index 00000000..bd27df58 --- /dev/null +++ b/src/cpl_reactive_extensions/internal/subscription.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import traceback +from typing import Union, Callable, Optional + +from cpl_core.console import Console +from cpl_reactive_extensions.abc.unsubscribable import Unsubscribable + + +class Subscription(Unsubscribable): + @staticmethod + def empty(): + empty = Subscription() + empty.closed = True + return empty + + def __init__(self, initial_teardown: Optional[Callable] = None): + Unsubscribable.__init__(self) + + self._initial_teardown = initial_teardown + + self._closed = False + + self._parentage: list[Subscription] = [] + self._finalizers: list[Subscription] = [] + + @property + def closed(self) -> bool: + return self._closed + + @closed.setter + def closed(self, value: bool): + self._closed = value + + def _add_parent(self, parent: Subscription): + self._parentage.append(parent) + + def _remove_parent(self, parent: Subscription): + if self == parent: + self._parentage.clear() + return + + self._parentage.remove(parent) + + def _has_parent(self, parent: Subscription) -> bool: + return parent in self._parentage + + def _exec_finalizer(self, finalizer: Union[Callable, Unsubscribable]): + if isinstance(finalizer, Callable): + finalizer() + else: + finalizer.unsubscribe() + + def unsubscribe(self): + if not self._closed: + self._closed = True + + for parent in self._parentage: + parent.remove(self) + + if self._initial_teardown is not None: + try: + self._initial_teardown() + except Exception as e: + Console.error(e, traceback.format_exc()) + + finalizers = self._finalizers + self._finalizers = None + for finalizer in finalizers: + try: + self._exec_finalizer(finalizer) + except Exception as e: + Console.error(e, traceback.format_exc()) + + def add(self, tear_down: Union[Subscription, Unsubscribable]): + if tear_down is None or tear_down == self: + return + + if self.closed: + self._exec_finalizer(tear_down) + return + + if isinstance(tear_down, Subscription): + if tear_down.closed or tear_down._has_parent(self): + return + + tear_down._add_parent(self) + + self._finalizers.append(tear_down) + + def remove(self, tear_down: Union[Subscription, Unsubscribable]): + if self._finalizers is not None: + self._finalizers.remove(tear_down) + + if isinstance(tear_down, Subscription): + tear_down._remove_parent(self) diff --git a/src/cpl_reactive_extensions/internal/timer_provider.py b/src/cpl_reactive_extensions/internal/timer_provider.py new file mode 100644 index 00000000..aafd0314 --- /dev/null +++ b/src/cpl_reactive_extensions/internal/timer_provider.py @@ -0,0 +1,10 @@ +from typing import Callable + +from cpl_core.type import Number +from cpl_reactive_extensions.timer import Timer + + +class TimerProvider: + @staticmethod + def set_timer(handler: Callable, timeout: Number = None, *args): + return Timer(timeout, handler, *args) diff --git a/src/cpl_reactive_extensions/internal/utils.py b/src/cpl_reactive_extensions/internal/utils.py new file mode 100644 index 00000000..b02a7bbb --- /dev/null +++ b/src/cpl_reactive_extensions/internal/utils.py @@ -0,0 +1,20 @@ +from typing import Callable + +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.internal.subscriber import Subscriber + + +def operate(init: Callable[[Observable, Subscriber], None]): + def observable(source: Observable): + def create(self: Subscriber, lifted_source: Observable): + try: + return init(lifted_source, self) + except Exception as e: + self.error(e) + + if "lift" not in dir(source): + raise TypeError("Unable to lift unknown Observable type") + + return source.lift(create) + + return observable diff --git a/src/cpl_reactive_extensions/interval.py b/src/cpl_reactive_extensions/interval.py new file mode 100644 index 00000000..464dcd45 --- /dev/null +++ b/src/cpl_reactive_extensions/interval.py @@ -0,0 +1,47 @@ +import sched +import threading +import time +from typing import Callable + +from cpl_reactive_extensions.internal.subscriber import Subscriber +from cpl_reactive_extensions.observable import Observable + + +class Interval(Observable): + def __init__(self, interval: float, callback: Callable = None, not_in_background=False): + self._interval = interval + callback = callback if callback is not None else self._default_callback + + def schedule(x: Subscriber): + scheduler = sched.scheduler(time.time, time.sleep) + scheduler.enter( + self._interval, + 1, + self._run, + (scheduler, x, callback), + ) + scheduler.run() + + def thread(x: Subscriber): + t = threading.Thread(target=schedule, args=(x,)) + t.start() + + Observable.__init__(self, schedule if not_in_background else thread) + self._i = 0 + + def _run(self, scheduler, x: Subscriber, callback: Callable): + if x.closed: + x.complete() + return + + scheduler.enter( + self._interval, + 1, + self._run, + (scheduler, x, callback), + ) + callback(x) + + def _default_callback(self, x: Subscriber): + x.next(self._i) + self._i += 1 diff --git a/src/cpl_reactive_extensions/observable.py b/src/cpl_reactive_extensions/observable.py new file mode 100644 index 00000000..dfec039b --- /dev/null +++ b/src/cpl_reactive_extensions/observable.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +from typing import Callable, Any, Optional + +from cpl_core.type import T +from cpl_reactive_extensions.abc.observer import Observer +from cpl_reactive_extensions.abc.subscribable import Subscribable +from cpl_reactive_extensions.internal.subscriber import Subscriber +from cpl_reactive_extensions.internal.subscription import Subscription +from cpl_reactive_extensions.type import ObserverOrCallable + + +class Observable(Subscribable): + def __init__(self, subscribe: Callable = None): + Subscribable.__init__(self) + if subscribe is not None: + self._subscribe = subscribe + + self._source: Optional[Observable] = None + self._operator: Optional[Callable] = None + + @staticmethod + def from_observable(obs: Observable): + def inner(subscriber: Subscriber): + if "subscribe" not in dir(obs): + raise TypeError("Unable to lift unknown Observable type") + + return obs.subscribe(subscriber) + + return Observable(inner) + + @staticmethod + def from_list(values: list): + i = 0 + + def callback(x: Subscriber): + nonlocal i + if i == len(values): + i = 0 + x.complete() + else: + x.next(values[i]) + i += 1 + + if not x.closed: + callback(x) + + observable = Observable(callback) + return observable + + def lift(self, operator: Callable) -> Observable: + observable = Observable() + observable._source = self + observable._operator = operator + return observable + + @staticmethod + def _is_observer(value: Any) -> bool: + return isinstance(value, Observer) + + @staticmethod + def _is_subscription(value: Any) -> bool: + return isinstance(value, Subscription) + + @staticmethod + def _is_subscriber(value: Any) -> bool: + return isinstance(value, Subscriber) or Observable._is_observer(value) and Observable._is_subscription(value) + + def _subscribe(self, subscriber: Subscriber) -> Subscription: + return self._source.subscribe(subscriber) + + def _try_subscribe(self, subscriber: Subscriber) -> Subscription: + try: + return self._subscribe(subscriber) + except Exception as e: + subscriber.error(e) + + def subscribe( + self, observer_or_next: ObserverOrCallable, on_error: Callable = None, on_complete: Callable = None + ) -> Subscription: + subscriber = ( + observer_or_next + if Observable._is_subscriber(observer_or_next) + else Subscriber(observer_or_next, on_error, on_complete) + ) + + subscriber.add( + self._operator(subscriber, self._source) + if self._operator is not None + else self._subscribe(subscriber) + if self._source is not None + else self._try_subscribe(subscriber) + ) + + return subscriber + + def pipe(self, *args) -> Observable: + return self._pipe_from_array(args)(self) + + def _pipe_from_array(self, args): + if len(args) == 0: + return lambda x: x + + if len(args) == 1: + return args[0] + + def piped(input: T): + return Observable._reduce(lambda prev, fn: fn(prev), input) + + return piped + + @staticmethod + def _reduce(func: Callable, input: T): + return func(input) diff --git a/src/cpl_reactive_extensions/operators/__init__.py b/src/cpl_reactive_extensions/operators/__init__.py new file mode 100644 index 00000000..45eda886 --- /dev/null +++ b/src/cpl_reactive_extensions/operators/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +""" +cpl-reactive-extensions CPL Simple ReactiveX implementation +~~~~~~~~~~~~~~~~~~~ + +CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation. + +:copyright: (c) 2023 sh-edraft.de +:license: MIT, see LICENSE for more details. + +""" + +__title__ = "cpl_reactive_extensions.operators" +__author__ = "Sven Heidemann" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2023 sh-edraft.de" +__version__ = "2023.4.dev170" + +from collections import namedtuple + + +# imports: + +VersionInfo = namedtuple("VersionInfo", "major minor micro") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/operators/debounce_time.py b/src/cpl_reactive_extensions/operators/debounce_time.py new file mode 100644 index 00000000..4a0ccfd8 --- /dev/null +++ b/src/cpl_reactive_extensions/operators/debounce_time.py @@ -0,0 +1,62 @@ +from typing import Optional + +from cpl_core.type import T, Number +from cpl_reactive_extensions.abc.scheduler_action import SchedulerAction +from cpl_reactive_extensions.internal.operator_subscriber import OperatorSubscriber +from cpl_reactive_extensions.internal.subscriber import Subscriber +from cpl_reactive_extensions.internal.subscription import Subscription +from cpl_reactive_extensions.internal.utils import operate +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.scheduler.async_scheduler import async_scheduler + + +def debounce_time(time: Number, scheduler=async_scheduler): + def init(source: Observable, subscriber: Subscriber): + active_task: Optional[Subscription] = None + last_value: Optional[T] = None + last_time: Optional[Number] = None + + def emit(): + nonlocal active_task, last_value + + if active_task is None: + return + + active_task.unsubscribe() + active_task = None + value = last_value + last_value = None + subscriber.next(value) + + def emit_when_idle(action: SchedulerAction): + nonlocal active_task, last_time + target_time = last_time + time + now = scheduler.now + + if now < target_time: + active_task = action.schedule(None, target_time - now) + subscriber.add(active_task) + return + + emit() + + def on_next(value: T): + nonlocal active_task, last_value + last_value = value + + if active_task is None: + active_task = scheduler.schedule(emit_when_idle, time) + subscriber.add(active_task) + + def on_complete(): + emit() + subscriber.complete() + + def on_finalize(): + nonlocal active_task, last_value + last_value = None + active_task = None + + sub = source.subscribe(OperatorSubscriber(subscriber, on_next, None, on_complete, on_finalize)) + + return operate(init) diff --git a/src/cpl_reactive_extensions/operators/take.py b/src/cpl_reactive_extensions/operators/take.py new file mode 100644 index 00000000..43c34cc0 --- /dev/null +++ b/src/cpl_reactive_extensions/operators/take.py @@ -0,0 +1,29 @@ +from cpl_core.type import T +from cpl_reactive_extensions.internal.subscriber import Subscriber +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.internal.operator_subscriber import OperatorSubscriber +from cpl_reactive_extensions.internal.utils import operate + + +def take(count: int): + if count <= 0: + return Observable() + + def init(source: Observable, subscriber: Subscriber): + seen = 0 + + def on_next(value: T): + nonlocal seen + + if seen + 1 <= count: + seen += 1 + subscriber.next(value) + + if count <= seen: + subscriber.complete() + else: + sub.unsubscribe() + + sub = source.subscribe(OperatorSubscriber(subscriber, on_next)) + + return operate(init) diff --git a/src/cpl_reactive_extensions/operators/take_until.py b/src/cpl_reactive_extensions/operators/take_until.py new file mode 100644 index 00000000..9363291f --- /dev/null +++ b/src/cpl_reactive_extensions/operators/take_until.py @@ -0,0 +1,14 @@ +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.internal.operator_subscriber import OperatorSubscriber +from cpl_reactive_extensions.internal.subscriber import Subscriber +from cpl_reactive_extensions.internal.utils import operate + + +def take_until(notifier: Observable): + def init(source: Observable, subscriber: Subscriber): + Observable.from_observable(notifier).subscribe(OperatorSubscriber(subscriber, lambda: subscriber.complete())) + + if not subscriber.closed: + source.subscribe(subscriber) + + return operate(init) diff --git a/src/cpl_reactive_extensions/scheduler/__init__.py b/src/cpl_reactive_extensions/scheduler/__init__.py new file mode 100644 index 00000000..a7c22f9c --- /dev/null +++ b/src/cpl_reactive_extensions/scheduler/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +""" +cpl-reactive-extensions CPL Simple ReactiveX implementation +~~~~~~~~~~~~~~~~~~~ + +CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation. + +:copyright: (c) 2023 sh-edraft.de +:license: MIT, see LICENSE for more details. + +""" + +__title__ = "cpl_reactive_extensions.scheduler" +__author__ = "Sven Heidemann" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2023 sh-edraft.de" +__version__ = "2023.4.dev170" + +from collections import namedtuple + + +# imports: + +VersionInfo = namedtuple("VersionInfo", "major minor micro") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/scheduler/async_scheduler.py b/src/cpl_reactive_extensions/scheduler/async_scheduler.py new file mode 100644 index 00000000..7636abd9 --- /dev/null +++ b/src/cpl_reactive_extensions/scheduler/async_scheduler.py @@ -0,0 +1,36 @@ +from typing import Type + +from cpl_reactive_extensions.internal.action import Action +from cpl_reactive_extensions.internal.async_action import AsyncAction +from cpl_reactive_extensions.scheduler.scheduler import Scheduler + + +class AsyncScheduler(Scheduler): + def __init__(self, scheduler_action_ctor: Type[Action], now=None): + Scheduler.__init__(self, scheduler_action_ctor, now) + + self.actions: list[AsyncAction] = [] + self._active = False + + def flush(self, action: AsyncAction): + if self._active: + self.actions.append(action) + return + + error = None + self._active = True + + for action in self.actions: + error = action.execute(action.state, action.delay) + if error: + break + + self._active = False + + if error is not None: + for action in self.actions: + action.unsubscribe() + raise error + + +async_scheduler = AsyncScheduler(AsyncAction) diff --git a/src/cpl_reactive_extensions/scheduler/scheduler.py b/src/cpl_reactive_extensions/scheduler/scheduler.py new file mode 100644 index 00000000..894a62b7 --- /dev/null +++ b/src/cpl_reactive_extensions/scheduler/scheduler.py @@ -0,0 +1,29 @@ +from datetime import datetime +from typing import Callable, Optional, Type + +from cpl_core.type import T, Number +from cpl_reactive_extensions.abc.scheduler_action import SchedulerAction +from cpl_reactive_extensions.abc.scheduler_like import SchedulerLike +from cpl_reactive_extensions.internal.action import Action +from cpl_reactive_extensions.internal.subscription import Subscription + + +class Scheduler(SchedulerLike): + @staticmethod + @property + def _get_now(self=None) -> Number: + return int(datetime.now().strftime("%s")) + + now = _get_now + + def __init__(self, scheduler_action_ctor: Type[Action], now=None): + self.now = self._get_now if now is None else now + self._scheduler_action_ctor = scheduler_action_ctor + + def schedule( + self, work: Callable[[SchedulerAction, Optional[T]], None], delay: Number, state: T = None + ) -> Subscription: + action = self._scheduler_action_ctor(self, work) + x = action.schedule(state, delay) + + return x diff --git a/src/cpl_reactive_extensions/subject/__init__.py b/src/cpl_reactive_extensions/subject/__init__.py new file mode 100644 index 00000000..e01be62a --- /dev/null +++ b/src/cpl_reactive_extensions/subject/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +""" +cpl-reactive-extensions CPL Simple ReactiveX implementation +~~~~~~~~~~~~~~~~~~~ + +CPL Simple ReactiveX implementation, see RxJS and RxPy for detailed implementation. + +:copyright: (c) 2023 sh-edraft.de +:license: MIT, see LICENSE for more details. + +""" + +__title__ = "cpl_reactive_extensions.subject" +__author__ = "Sven Heidemann" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2023 sh-edraft.de" +__version__ = "2023.4.dev170" + +from collections import namedtuple + + +# imports: + +VersionInfo = namedtuple("VersionInfo", "major minor micro") +version_info = VersionInfo(major="2023", minor="4", micro="dev170") diff --git a/src/cpl_reactive_extensions/subject/behavior_subject.py b/src/cpl_reactive_extensions/subject/behavior_subject.py new file mode 100644 index 00000000..5a9c76dc --- /dev/null +++ b/src/cpl_reactive_extensions/subject/behavior_subject.py @@ -0,0 +1,24 @@ +from typing import Type + +from cpl_core.type import T +from cpl_reactive_extensions.subject.subject import Subject + + +class BehaviorSubject(Subject): + def __init__(self, _t: Type[T], value: T): + Subject.__init__(self, _t) + + if not isinstance(value, _t): + raise TypeError(f"Expected {_t.__name__} not {type(value).__name__}") + + self._t = _t + self._value = value + + @property + def value(self) -> T: + return self._value + + def next(self, value: T): + super().next(value) + + self._value = value diff --git a/src/cpl_reactive_extensions/subject/subject.py b/src/cpl_reactive_extensions/subject/subject.py new file mode 100644 index 00000000..0e157f5d --- /dev/null +++ b/src/cpl_reactive_extensions/subject/subject.py @@ -0,0 +1,101 @@ +from types import NoneType +from typing import Any, Optional, Type + +from cpl_core.type import T +from cpl_reactive_extensions.abc.observer import Observer +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.internal.subscriber import Subscriber +from cpl_reactive_extensions.internal.subscription import Subscription + + +class Subject(Observable, Observer): + def __init__(self, _t: Type[T]): + Observable.__init__(self) + + self.is_closed = False + self._t = _t if _t is not None else NoneType + self._current_observers: Optional[list[Observer]] = None + + self.closed = False + self.observers: list[Observer] = [] + self.is_stopped = False + self.has_error = False + self.raised_error: Any = None + + @property + def observed(self) -> bool: + return len(self.observers) > 0 + + def _raise_if_closed(self): + if not self.closed: + return + raise Exception("Subject is unsubscribed!") + + def next(self, value: T): + self._raise_if_closed() + + if not isinstance(value, self._t): + raise TypeError(f"Expected {self._t.__name__} not {type(value).__name__}") + + if self.is_stopped: + return + + if self._current_observers is None: + self._current_observers = self.observers + + for observer in self._current_observers: + observer.next(value) + + def error(self, error: Exception): + self._raise_if_closed() + if self.is_stopped: + return + + self.is_stopped = True + self.has_error = self.is_stopped + for observer in self.observers: + observer.error(error) + + def complete(self): + self._raise_if_closed() + + if self.is_stopped: + return + + self.is_stopped = True + for observer in self.observers: + observer.complete() + + def unsubscribe(self): + self.is_stopped = True + self.is_closed = True + self._current_observers = None + self.observers = [] + + def _try_subscribe(self, subscriber: Subscriber): + self._raise_if_closed() + return super()._try_subscribe(subscriber) + + def _subscribe(self, subscriber: Subscriber) -> Subscription: + self._raise_if_closed() + self._check_finalized_statuses(subscriber) + return self._inner_subscribe(subscriber) + + def _check_finalized_statuses(self, subscriber: Subscriber): + if self.has_error: + subscriber.error(self.raised_error) + elif self.is_stopped: + subscriber.complete() + + def _inner_subscribe(self, subscriber: Subscriber) -> Optional[Subscription]: + if self.has_error or self.is_stopped: + return Subscription.empty() + + self._current_observers = None + self.observers.append(subscriber) + + def _initial(): + self._current_observers = None + self.observers.remove(subscriber) + + return Subscription(_initial) diff --git a/src/cpl_reactive_extensions/timer.py b/src/cpl_reactive_extensions/timer.py new file mode 100644 index 00000000..4b3a688d --- /dev/null +++ b/src/cpl_reactive_extensions/timer.py @@ -0,0 +1,24 @@ +import threading +import time +from typing import Callable + +from cpl_core.type import Number + + +class Timer: + def __init__(self, interval: Number, action: Callable, *args): + self._interval = interval / 1000 + self._action = action + self._args = args + self.stop_event = threading.Event() + thread = threading.Thread(target=self.__set_interval) + thread.start() + + def __set_interval(self): + next_time = time.time() + self._interval + while not self.stop_event.wait(next_time - time.time()): + next_time += self._interval + self._action(*self._args) + + def clear(self): + self.stop_event.set() diff --git a/src/cpl_reactive_extensions/type.py b/src/cpl_reactive_extensions/type.py new file mode 100644 index 00000000..4c478ec6 --- /dev/null +++ b/src/cpl_reactive_extensions/type.py @@ -0,0 +1,5 @@ +from typing import Callable, Union + +from cpl_reactive_extensions.abc.observer import Observer + +ObserverOrCallable = Union[Callable, Observer] diff --git a/unittests/unittests/application.py b/unittests/unittests/application.py index 932fb34b..95a4854f 100644 --- a/unittests/unittests/application.py +++ b/unittests/unittests/application.py @@ -6,6 +6,7 @@ from cpl_core.dependency_injection import ServiceProviderABC from unittests_cli.cli_test_suite import CLITestSuite from unittests_core.core_test_suite import CoreTestSuite from unittests_query.query_test_suite import QueryTestSuite +from unittests_reactive_extenstions.reactive_test_suite import ReactiveTestSuite from unittests_translation.translation_test_suite import TranslationTestSuite @@ -21,4 +22,5 @@ class Application(ApplicationABC): runner.run(CoreTestSuite()) runner.run(CLITestSuite()) runner.run(QueryTestSuite()) + runner.run(ReactiveTestSuite()) runner.run(TranslationTestSuite()) diff --git a/unittests/unittests_reactive_extenstions/__init__.py b/unittests/unittests_reactive_extenstions/__init__.py new file mode 100644 index 00000000..425ab6c1 --- /dev/null +++ b/unittests/unittests_reactive_extenstions/__init__.py @@ -0,0 +1 @@ +# imports diff --git a/unittests/unittests_reactive_extenstions/observable_operator_test_case.py b/unittests/unittests_reactive_extenstions/observable_operator_test_case.py new file mode 100644 index 00000000..95779456 --- /dev/null +++ b/unittests/unittests_reactive_extenstions/observable_operator_test_case.py @@ -0,0 +1,88 @@ +import time +import traceback +import unittest + +from cpl_core.console import Console +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.operators.debounce_time import debounce_time +from cpl_reactive_extensions.subject.subject import Subject +from cpl_reactive_extensions.interval import Interval +from cpl_reactive_extensions.operators.take import take +from cpl_reactive_extensions.operators.take_until import take_until + + +class ObservableOperatorTestCase(unittest.TestCase): + def setUp(self): + self._error = False + self._completed = False + + def _on_error(self, ex: Exception): + tb = traceback.format_exc() + Console.error(f"Got error from observable: {ex}", tb) + self._error = True + + def _on_complete(self): + self._completed = True + + def test_take_two(self): + count = 0 + + def sub(x): + nonlocal count + + count += 1 + + observable = Interval(0.1) + sub = observable.pipe(take(2)).subscribe(sub) + time.sleep(0.5) + self.assertEqual(count, 2) + sub.unsubscribe() + + def test_take_five(self): + count = 0 + + def sub(x): + nonlocal count + + count += 1 + + observable = Interval(0.1) + sub = observable.pipe(take(5)).subscribe(sub) + time.sleep(1) + self.assertEqual(count, 5) + sub.unsubscribe() + + def test_take_until(self): + count = 0 + unsubscriber = Subject(None) + + def sub(x): + nonlocal count + + count += 1 + + observable = Interval(0.1) + observable.pipe(take_until(unsubscriber)).subscribe(sub) + + timer = 2 + time.sleep(timer) + unsubscriber.next(None) + unsubscriber.complete() + self.assertEqual(count, timer * 10 - 1) + + # def test_debounce_time(self): + # def call(x): + # x.next(1) + # x.next(2) + # x.next(3) + # x.next(4) + # x.next(5) + # x.next(6) + # x.complete() + # + # observable = Observable(call) + # + # sub = observable.pipe(debounce_time(600)).subscribe(lambda x: Console.write_line("Hey", x)) + # + # time.sleep(2) + # sub.unsubscribe() diff --git a/unittests/unittests_reactive_extenstions/reactive_test_case.py b/unittests/unittests_reactive_extenstions/reactive_test_case.py new file mode 100644 index 00000000..10b35bfb --- /dev/null +++ b/unittests/unittests_reactive_extenstions/reactive_test_case.py @@ -0,0 +1,176 @@ +import time +import traceback +import unittest +from datetime import datetime +from threading import Timer + +from cpl_core.console import Console +from cpl_reactive_extensions.subject.behavior_subject import BehaviorSubject +from cpl_reactive_extensions.interval import Interval +from cpl_reactive_extensions.observable import Observable +from cpl_reactive_extensions.subject.subject import Subject +from cpl_reactive_extensions.internal.subscriber import Observer + + +class ReactiveTestCase(unittest.TestCase): + def setUp(self): + self._error = False + self._completed = False + + def _on_error(self, ex: Exception): + tb = traceback.format_exc() + Console.error(f"Got error from observable: {ex}", tb) + self._error = True + + def _on_complete(self): + self._completed = True + + def test_observer(self): + called = 0 + test_x = 1 + + def callback(observer: Observer): + nonlocal test_x + observer.next(test_x) + test_x += 1 + observer.next(test_x) + test_x += 1 + observer.next(test_x) + + def complete(): + nonlocal test_x + test_x += 1 + observer.next(test_x) + observer.complete() + + Timer(1.0, complete).start() + + observable = Observable(callback) + + def on_next(x): + nonlocal called + called += 1 + self.assertEqual(test_x, x) + + self.assertEqual(called, 0) + self.assertFalse(self._error) + self.assertFalse(self._completed) + observable.subscribe( + on_next, + self._on_error, + self._on_complete, + ) + self.assertEqual(called, 3) + self.assertFalse(self._error) + self.assertFalse(self._completed) + + def complete(): + self.assertEqual(called, 4) + self.assertFalse(self._error) + self.assertTrue(self._completed) + + Timer(1.0, complete).start() + + def test_observer_completed(self): + def _test_complete(x: Observer): + x.next(1) + x.next(2) + x.complete() + x.next(3) + + observable = Observable(_test_complete) + + observable.subscribe(lambda x: Console.write_line(1, x), self._on_error) + self.assertTrue(self._error) + + def test_observable_from(self): + expected_x = 1 + + def _next(x): + nonlocal expected_x + self.assertEqual(expected_x, x) + expected_x += 1 + + observable = Observable.from_list([1, 2, 3, 4]) + sub = observable.subscribe( + _next, + self._on_error, + ) + self.assertFalse(self._error) + sub.unsubscribe() + + def test_subject(self): + expected_x = 1 + + def _next(calc, x): + nonlocal expected_x + self.assertEqual(expected_x, x) + if not calc: + return + expected_x += 1 + if expected_x == 4: + expected_x = 1 + + subject = Subject(int) + subject.subscribe(lambda x: _next(False, x), self._on_error) + subject.subscribe(lambda x: _next(True, x), self._on_error) + + observable = Observable.from_list([1, 2, 3]) + sub = observable.subscribe(subject, self._on_error) + + self.assertFalse(self._error) + sub.unsubscribe() + + def test_behavior_subject(self): + subject = BehaviorSubject(int, 0) + + subject.subscribe(lambda x: Console.write_line("a", x)) + + subject.next(1) + subject.next(2) + + subject.subscribe(lambda x: Console.write_line("b", x)) + subject.next(3) + subject.unsubscribe() + + def test_interval_default(self): + wait = 10 + i = 0 + + def test_sub(x): + nonlocal i + self.assertEqual(x, i) + i += 1 + + observable = Interval(1.0) + sub = observable.subscribe(test_sub) + start = datetime.now() + + time.sleep(wait) + sub.unsubscribe() + end = datetime.now() + self.assertEqual(round((end - start).total_seconds()), wait) + + def test_interval_custom(self): + wait = 10 + i = 0 + n = 0 + + def callback(x: Observer): + nonlocal n + x.next(n) + n += 1 + + def test_sub(x): + nonlocal i + self.assertEqual(x, i) + i += 1 + + observable = Interval(1.0, callback) + sub = observable.subscribe(test_sub) + start = datetime.now() + + time.sleep(wait) + sub.unsubscribe() + end = datetime.now() + self.assertEqual(round((end - start).total_seconds()), wait) diff --git a/unittests/unittests_reactive_extenstions/reactive_test_suite.py b/unittests/unittests_reactive_extenstions/reactive_test_suite.py new file mode 100644 index 00000000..d0d541b7 --- /dev/null +++ b/unittests/unittests_reactive_extenstions/reactive_test_suite.py @@ -0,0 +1,23 @@ +import unittest + +from unittests_reactive_extenstions.observable_operator_test_case import ObservableOperatorTestCase +from unittests_reactive_extenstions.reactive_test_case import ReactiveTestCase +from unittests_reactive_extenstions.scheduler_test_case import SchedulerTestCase + + +class ReactiveTestSuite(unittest.TestSuite): + def __init__(self): + unittest.TestSuite.__init__(self) + + loader = unittest.TestLoader() + self.addTests(loader.loadTestsFromTestCase(ReactiveTestCase)) + self.addTests(loader.loadTestsFromTestCase(ObservableOperatorTestCase)) + self.addTests(loader.loadTestsFromTestCase(SchedulerTestCase)) + + def run(self, *args): + super().run(*args) + + +if __name__ == "__main__": + runner = unittest.TextTestRunner() + runner.run(ReactiveTestSuite()) diff --git a/unittests/unittests_reactive_extenstions/scheduler_test_case.py b/unittests/unittests_reactive_extenstions/scheduler_test_case.py new file mode 100644 index 00000000..18c1523b --- /dev/null +++ b/unittests/unittests_reactive_extenstions/scheduler_test_case.py @@ -0,0 +1,35 @@ +import time +import unittest +from datetime import datetime + +from cpl_core.console import Console +from cpl_reactive_extensions.timer import Timer + + +class SchedulerTestCase(unittest.TestCase): + def setUp(self): + pass + + def test_timer(self): + count = 0 + + def task(): + nonlocal count + Console.write_line(datetime.now(), "Hello world") + count += 1 + + timer = Timer(100, task) + time.sleep(0.25) + self.assertEqual(count, 2) + timer.clear() + + def test_schedule(self): + count = 0 + + def task(): + nonlocal count + Console.write_line(datetime.now(), "Hello world") + count += 1 + + # async_scheduler.schedule(task, 100) + time.sleep(2) diff --git a/unittests/unittests_reactive_extenstions/test.py b/unittests/unittests_reactive_extenstions/test.py new file mode 100644 index 00000000..f620d934 --- /dev/null +++ b/unittests/unittests_reactive_extenstions/test.py @@ -0,0 +1,21 @@ +import time +from datetime import datetime + +from cpl_core.console import Console +from cpl_reactive_extensions.timer import Timer + + +def test_timer(): + is_working = False + + def task(): + nonlocal is_working + Console.write_line(datetime.now(), "Hello world") + is_working = True + + timer = Timer(100, task) + time.sleep(0.2) + timer.clear() + + +test_timer() diff --git a/unittests/unittests_reactive_extenstions/unittests_reactive_extenstions.json b/unittests/unittests_reactive_extenstions/unittests_reactive_extenstions.json new file mode 100644 index 00000000..bc23ec7a --- /dev/null +++ b/unittests/unittests_reactive_extenstions/unittests_reactive_extenstions.json @@ -0,0 +1,46 @@ +{ + "ProjectSettings": { + "Name": "unittests_reactive_extenstions", + "Version": { + "Major": "2023", + "Minor": "4", + "Micro": "dev170" + }, + "Author": "", + "AuthorEmail": "", + "Description": "", + "LongDescription": "", + "URL": "", + "CopyrightDate": "", + "CopyrightName": "", + "LicenseName": "", + "LicenseDescription": "", + "Dependencies": [ + "cpl-core>=2023.4.dev170" + ], + "DevDependencies": [ + "cpl-cli>=2023.4.0" + ], + "PythonVersion": ">=3.10.4", + "PythonPath": { + "linux": "" + }, + "Classifiers": [] + }, + "BuildSettings": { + "ProjectType": "unittest", + "SourcePath": "", + "OutputPath": "../../dist", + "Main": "unittests_reactive_extenstions.main", + "EntryPoint": "unittests_reactive_extenstions", + "IncludePackageData": false, + "Included": [], + "Excluded": [ + "*/__pycache__", + "*/logs", + "*/tests" + ], + "PackageData": {}, + "ProjectReferences": [] + } +} \ No newline at end of file