Files
cpl/test/core/utils/cache_test.py
clu ca58f636ee
Some checks failed
Test before pr merge / test-lint (pull_request) Failing after 13s
Test before pr merge / test (pull_request) Successful in 40s
test(core): add unit tests for untested core modules
Adds 113 tests covering:
- abc: RegistryABC (concrete implementation + edge cases)
- environment: Environment get/set, EnvironmentEnum
- pipes: BoolPipe, IPAddressPipe (incl. roundtrip + error cases)
- time: Cron (next(), intervals, invalid expression)
- utils: Cache (TTL, expiry, cleanup), get_value (incl. bug
  documentation: cast result not returned for string->typed values),
  JSONProcessor (nested objects, enums, defaults)
- property: classproperty (class access, instance access, subclass)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:40:01 +02:00

126 lines
2.5 KiB
Python

import time
import pytest
from cpl.core.utils.cache import Cache
def test_set_and_get():
cache = Cache()
cache.set("key1", "value1")
assert cache.get("key1") == "value1"
cache.stop()
def test_get_missing_key_returns_none():
cache = Cache()
assert cache.get("nonexistent") is None
cache.stop()
def test_has_existing_key():
cache = Cache()
cache.set("key", "val")
assert cache.has("key") is True
cache.stop()
def test_has_missing_key():
cache = Cache()
assert cache.has("missing") is False
cache.stop()
def test_delete():
cache = Cache()
cache.set("key", "val")
cache.delete("key")
assert cache.get("key") is None
assert cache.has("key") is False
cache.stop()
def test_clear():
cache = Cache()
cache.set("a", 1)
cache.set("b", 2)
cache.clear()
assert cache.get("a") is None
assert cache.get("b") is None
cache.stop()
def test_get_all():
cache = Cache()
cache.set("x", 10)
cache.set("y", 20)
values = cache.get_all()
assert set(values) == {10, 20}
cache.stop()
def test_get_all_empty():
cache = Cache()
assert cache.get_all() == []
cache.stop()
def test_ttl_expiry():
cache = Cache()
cache.set("temp", "data", ttl=1)
assert cache.get("temp") == "data"
time.sleep(1.1)
assert cache.get("temp") is None
cache.stop()
def test_has_after_ttl_expiry():
cache = Cache()
cache.set("temp", "data", ttl=1)
time.sleep(1.1)
assert cache.has("temp") is False
cache.stop()
def test_default_ttl():
cache = Cache(default_ttl=1)
cache.set("key", "value")
assert cache.get("key") == "value"
time.sleep(1.1)
assert cache.get("key") is None
cache.stop()
def test_per_key_ttl_overrides_default():
cache = Cache(default_ttl=60)
cache.set("short", "val", ttl=1)
time.sleep(1.1)
assert cache.get("short") is None
cache.stop()
def test_get_all_excludes_expired():
cache = Cache()
cache.set("permanent", "keep")
cache.set("temporary", "gone", ttl=1)
time.sleep(1.1)
values = cache.get_all()
assert "keep" in values
assert "gone" not in values
cache.stop()
def test_overwrite_key():
cache = Cache()
cache.set("key", "old")
cache.set("key", "new")
assert cache.get("key") == "new"
cache.stop()
def test_cleanup_removes_expired():
cache = Cache(cleanup_interval=9999)
cache.set("exp", "x", ttl=1)
time.sleep(1.1)
cache.cleanup()
assert cache.get("exp") is None
cache.stop()