diff --git a/README.md b/README.md index 53416cd..4088d80 100644 --- a/README.md +++ b/README.md @@ -293,13 +293,13 @@ Client.schedule_snapshot_auto_update( ) ``` -### Snapshot Monitoring (Coming Soon) +### Snapshot Monitoring ```python -# 🚧 TODO: Watch for snapshot file changes +# Watch for snapshot file changes Client.watch_snapshot({ - 'success': lambda: print('In-memory snapshot updated'), - 'reject': lambda err: print(f'Update failed: {err}') + 'success': lambda: print("✅ Snapshot loaded successfully"), + 'reject': lambda e: print(f"❌ Error loading snapshot: {e}") }) ``` diff --git a/switcher_client/client.py b/switcher_client/client.py index 980ee1e..01f35c8 100644 --- a/switcher_client/client.py +++ b/switcher_client/client.py @@ -7,6 +7,7 @@ from .lib.remote import Remote from .lib.snapshot_auto_updater import SnapshotAutoUpdater from .lib.snapshot_loader import check_switchers, load_domain, validate_snapshot, save_snapshot +from .lib.snapshot_watcher import SnapshotWatcher from .lib.utils.execution_logger import ExecutionLogger from .lib.utils.timed_match.timed_match import TimedMatch from .lib.utils import get @@ -21,6 +22,7 @@ class SwitcherOptions: class Client: _context: Context = Context.empty() _switcher: dict[str, Switcher] = {} + _snapshot_watcher: SnapshotWatcher = SnapshotWatcher() @staticmethod def build_context( @@ -158,6 +160,24 @@ def terminate_snapshot_auto_update(): """ Terminate Snapshot auto update """ SnapshotAutoUpdater.terminate() + @staticmethod + def watch_snapshot(callback: Optional[dict] = None) -> None: + """ Watch snapshot file for changes and invoke callbacks on result """ + callback = get(callback, {}) + snapshot_location = Client._context.options.snapshot_location + + if snapshot_location is None: + reject = callback.get('reject', lambda _: None) + return reject(Exception("Snapshot location is not defined in the context options")) + + environment = get(Client._context.environment, DEFAULT_ENVIRONMENT) + Client._snapshot_watcher.watch_snapshot(snapshot_location, environment, callback) + + @staticmethod + def unwatch_snapshot() -> None: + """ Stop watching the snapshot file """ + Client._snapshot_watcher.unwatch_snapshot() + @staticmethod def snapshot_version() -> int: """ Get the version of the snapshot """ @@ -190,6 +210,7 @@ def clear_logger() -> None: def clear_resources() -> None: """ Clear all resources used by the Client """ Client.terminate_snapshot_auto_update() + Client.unwatch_snapshot() ExecutionLogger.clear_logger() GlobalSnapshot.clear() TimedMatch.terminate_worker() @@ -201,7 +222,7 @@ def subscribe_notify_error(callback: Callable[[Exception], None]) -> None: It is usually used when throttle and silent mode are enabled. """ ExecutionLogger.subscribe_notify_error(callback) - + @staticmethod def _is_check_snapshot_available(fetch_remote = False) -> bool: return Client.snapshot_version() == 0 and (fetch_remote or not Client._context.options.local) diff --git a/switcher_client/lib/snapshot_watcher.py b/switcher_client/lib/snapshot_watcher.py new file mode 100644 index 0000000..51a9224 --- /dev/null +++ b/switcher_client/lib/snapshot_watcher.py @@ -0,0 +1,61 @@ +import os +import threading + +from .snapshot_loader import load_domain +from .globals.global_snapshot import GlobalSnapshot + +_POLL_INTERVAL = 1 # seconds between file stat checks + +class SnapshotWatcher: + """ Watches the snapshot file for changes and updates the switcher accordingly """ + + def __init__(self): + self._stop_event: threading.Event = threading.Event() + self._ready_event: threading.Event = threading.Event() + self._thread: threading.Thread | None = None + + def watch_snapshot(self, snapshot_location: str, environment: str, callback: dict) -> None: + """ Watch snapshot file for changes and invoke callbacks on result """ + self._stop_event.clear() + self._ready_event.clear() + self._thread = threading.Thread( + target=self._watch, + args=(snapshot_location, environment, callback), + daemon=True, + name="SnapshotWatcher" + ) + self._thread.start() + self._ready_event.wait() + + def unwatch_snapshot(self) -> None: + """ Stop watching the snapshot file """ + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=5.0) + self._thread = None + + def _watch(self, snapshot_location: str, environment: str, callback: dict) -> None: + snapshot_file = f"{snapshot_location}/{environment}.json" + last_mtime = self._get_mtime(snapshot_file) + self._ready_event.set() + + while not self._stop_event.is_set(): + self._stop_event.wait(_POLL_INTERVAL) + current_mtime = self._get_mtime(snapshot_file) + if current_mtime != last_mtime: + last_mtime = current_mtime + self._on_modify_snapshot(snapshot_location, environment, callback) + + def _get_mtime(self, snapshot_file: str) -> float: + return os.stat(snapshot_file).st_mtime + + def _on_modify_snapshot(self, snapshot_location: str, environment: str, callback: dict) -> None: + success = callback.get('success', lambda: None) + reject = callback.get('reject', lambda _: None) + + try: + snapshot = load_domain(snapshot_location, environment) + GlobalSnapshot.init(snapshot) + success() + except Exception as error: + reject(error) \ No newline at end of file diff --git a/tests/playground/index.py b/tests/playground/index.py index ebe7d63..b8e12ea 100644 --- a/tests/playground/index.py +++ b/tests/playground/index.py @@ -19,7 +19,9 @@ def setup_context(options: ContextOptions = ContextOptions(), environment = DEFA options=options ) -def simple_api_call(): +# Use cases + +def uc_simple_api_call(): """ Use case: Check Switcher using remote API """ setup_context(ContextOptions( local=False @@ -30,7 +32,7 @@ def simple_api_call(): monitor_thread = threading.Thread(target=monitor_run, args=(switcher,), daemon=True) monitor_thread.start() -def simple_api_call_with_throttle(): +def uc_simple_api_call_with_throttle(): """ Use case: Check Switcher using remote API with throttle """ setup_context(ContextOptions( local=False @@ -42,7 +44,7 @@ def simple_api_call_with_throttle(): monitor_thread = threading.Thread(target=monitor_run, args=(switcher,True), daemon=True) monitor_thread.start() -def load_snapshot_from_remote(): +def uc_load_snapshot_from_remote(): """ Use case: Load snapshot from remote API """ global LOOP LOOP = False @@ -58,7 +60,7 @@ def load_snapshot_from_remote(): print(f"Snapshot version: {Client.snapshot_version()}") -def auto_update_snapshot(): +def uc_auto_update_snapshot(): """ Use case: Auto update snapshot """ setup_context(ContextOptions( local=True, @@ -77,7 +79,7 @@ def auto_update_snapshot(): ) ) -def check_switchers(): +def uc_check_switchers(): """ Use case: Check switchers """ global LOOP LOOP = False @@ -89,9 +91,28 @@ def check_switchers(): except Exception as e: print(f"❌ Configuration error: {e}") +def uc_watch_snapshot(): + """ Use case: Watch snapshot file for changes """ + setup_context( + environment='default_load_1', + options=ContextOptions( + local=True, + snapshot_location='../../tests/snapshots' + )) + + Client.load_snapshot() + Client.watch_snapshot({ + 'success': lambda: print("✅ Snapshot loaded successfully"), + 'reject': lambda e: print(f"❌ Error loading snapshot: {e}") + }) + + switcher = Client.get_switcher('FF2FOR2030') + monitor_thread = threading.Thread(target=monitor_run, args=(switcher,True), daemon=True) + monitor_thread.start() + try: # Replace with use case - simple_api_call() + uc_simple_api_call() while LOOP: time.sleep(1) except KeyboardInterrupt: diff --git a/tests/snapshots/default_malformed.json b/tests/snapshots/default_malformed.json new file mode 100644 index 0000000..1072259 --- /dev/null +++ b/tests/snapshots/default_malformed.json @@ -0,0 +1,8 @@ +{ + "domain": { + "name": "Business", + "description": "Business description", + "version": 1, + "activated": true// + } +} \ No newline at end of file diff --git a/tests/test_client_watch_snapshot.py b/tests/test_client_watch_snapshot.py new file mode 100644 index 0000000..cbd8960 --- /dev/null +++ b/tests/test_client_watch_snapshot.py @@ -0,0 +1,127 @@ +import os +import shutil +import time + +from typing import Optional + +from switcher_client.client import Client, ContextOptions +from switcher_client.lib.globals.global_context import DEFAULT_ENVIRONMENT +from switcher_client.lib.snapshot_watcher import SnapshotWatcher + +class TestClientWatchSnapshot: + """ Test suite for Client.watch_snapshot """ + + def setup_method(self): + self.async_success = None + self.async_error = None + Client._snapshot_watcher = SnapshotWatcher() + + def teardown_method(self): + Client.unwatch_snapshot() + + def teardown_class(self): + temp_folder = 'tests/snapshots/temp' + if os.path.exists(temp_folder): + shutil.rmtree(temp_folder) + + def test_watch_snapshot(self): + """ Should watch the snapshot file and update the switcher when the file changes """ + + fixture_env = 'default_load_1' + fixture_env_file_modified = 'tests/snapshots/default_load_2.json' + fixture_location = 'tests/snapshots/temp' + + # given + modify_fixture_snapshot(fixture_location, fixture_env, f'tests/snapshots/{fixture_env}.json') + given_context(snapshot_location=fixture_location, environment=fixture_env) + Client.load_snapshot() + + # test + switcher = Client.get_switcher('FF2FOR2030') + Client.watch_snapshot({ + 'success': lambda: setattr(self, 'async_success', True), + 'reject': lambda err: setattr(self, 'async_error', err) + }) + + assert switcher.is_on() + modify_fixture_snapshot(fixture_location, fixture_env, fixture_env_file_modified) + + # then + verified = verify_util(30, lambda: self.async_success is True and self.async_error is None) + if verified: + assert switcher.is_on() == False + assert self.async_error is None + else: + print("Warning: Snapshot watcher did not detect the change within the time limit") + + def test_watch_snapshot_err_no_snapshot_location(self): + """ Should reject with error when snapshot location is not defined in the context options """ + + # given + given_context() + + # test + Client.watch_snapshot({ + 'success': lambda: setattr(self, 'async_success', True), + 'reject': lambda err: setattr(self, 'async_error', err) + }) + + # then + assert self.async_success is None + assert str(self.async_error) == 'Snapshot location is not defined in the context options' + + def test_watch_snapshot_err_malformed_snapshot(self): + """ Should reject with error when snapshot file is malformed """ + + fixture_env = 'default_load_1' + fixture_env_file_modified = 'tests/snapshots/default_malformed.json' + fixture_location = 'tests/snapshots/temp' + + # given + modify_fixture_snapshot(fixture_location, fixture_env, f'tests/snapshots/{fixture_env}.json') + given_context(snapshot_location=fixture_location, environment=fixture_env) + Client.load_snapshot() + + # test + Client.watch_snapshot({ + 'success': lambda: setattr(self, 'async_success', True), + 'reject': lambda err: setattr(self, 'async_error', err) + }) + + modify_fixture_snapshot(fixture_location, fixture_env, fixture_env_file_modified) + + # then + verified = verify_util(30, lambda: self.async_error is not None) + if verified: + assert str(self.async_error) == "Expecting ',' delimiter: line 6 column 26 (char 140)" + else: + print("Warning: Snapshot watcher did not detect the change within the time limit") + +# Helpers + +def given_context(environment: str = DEFAULT_ENVIRONMENT, snapshot_location: Optional[str] = None): + Client.build_context( + domain='Playground', + environment=environment, + options=ContextOptions( + local=True, + snapshot_location=snapshot_location + ) + ) + +def modify_fixture_snapshot(location: str, environment: str, fixture_modified_location: str): + with open(fixture_modified_location, 'r') as file: + data = file.read() + + os.makedirs(location, exist_ok=True) + snapshot_file = f"{location}/{environment}.json" + with open(snapshot_file, 'w') as file: + file.write(data) + +def verify_util(max_time: int, condition_fn) -> bool: + start_time = time.time() + while time.time() - start_time < max_time: + if condition_fn(): + return True + time.sleep(1) + return False \ No newline at end of file