Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,13 @@ Client.schedule_snapshot_auto_update(
)
```

### Snapshot Monitoring (Coming Soon)
### Snapshot Monitoring

```python
# 🚧 TODO: Watch for snapshot file changes
# Watch for snapshot file changes
Client.watch_snapshot({
'success': lambda: print('In-memory snapshot updated'),
'reject': lambda err: print(f'Update failed: {err}')
'success': lambda: print("✅ Snapshot loaded successfully"),
'reject': lambda e: print(f"❌ Error loading snapshot: {e}")
})
```

Expand Down
23 changes: 22 additions & 1 deletion switcher_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .lib.remote import Remote
from .lib.snapshot_auto_updater import SnapshotAutoUpdater
from .lib.snapshot_loader import check_switchers, load_domain, validate_snapshot, save_snapshot
from .lib.snapshot_watcher import SnapshotWatcher
from .lib.utils.execution_logger import ExecutionLogger
from .lib.utils.timed_match.timed_match import TimedMatch
from .lib.utils import get
Expand All @@ -21,6 +22,7 @@ class SwitcherOptions:
class Client:
_context: Context = Context.empty()
_switcher: dict[str, Switcher] = {}
_snapshot_watcher: SnapshotWatcher = SnapshotWatcher()

@staticmethod
def build_context(
Expand Down Expand Up @@ -158,6 +160,24 @@ def terminate_snapshot_auto_update():
""" Terminate Snapshot auto update """
SnapshotAutoUpdater.terminate()

@staticmethod
def watch_snapshot(callback: Optional[dict] = None) -> None:
""" Watch snapshot file for changes and invoke callbacks on result """
callback = get(callback, {})
snapshot_location = Client._context.options.snapshot_location

if snapshot_location is None:
reject = callback.get('reject', lambda _: None)
return reject(Exception("Snapshot location is not defined in the context options"))

environment = get(Client._context.environment, DEFAULT_ENVIRONMENT)
Client._snapshot_watcher.watch_snapshot(snapshot_location, environment, callback)

@staticmethod
def unwatch_snapshot() -> None:
""" Stop watching the snapshot file """
Client._snapshot_watcher.unwatch_snapshot()

@staticmethod
def snapshot_version() -> int:
""" Get the version of the snapshot """
Expand Down Expand Up @@ -190,6 +210,7 @@ def clear_logger() -> None:
def clear_resources() -> None:
""" Clear all resources used by the Client """
Client.terminate_snapshot_auto_update()
Client.unwatch_snapshot()
ExecutionLogger.clear_logger()
GlobalSnapshot.clear()
TimedMatch.terminate_worker()
Expand All @@ -201,7 +222,7 @@ def subscribe_notify_error(callback: Callable[[Exception], None]) -> None:
It is usually used when throttle and silent mode are enabled.
"""
ExecutionLogger.subscribe_notify_error(callback)

@staticmethod
def _is_check_snapshot_available(fetch_remote = False) -> bool:
return Client.snapshot_version() == 0 and (fetch_remote or not Client._context.options.local)
Expand Down
61 changes: 61 additions & 0 deletions switcher_client/lib/snapshot_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import threading

from .snapshot_loader import load_domain
from .globals.global_snapshot import GlobalSnapshot

_POLL_INTERVAL = 1 # seconds between file stat checks

class SnapshotWatcher:
""" Watches the snapshot file for changes and updates the switcher accordingly """

def __init__(self):
self._stop_event: threading.Event = threading.Event()
self._ready_event: threading.Event = threading.Event()
self._thread: threading.Thread | None = None

def watch_snapshot(self, snapshot_location: str, environment: str, callback: dict) -> None:
""" Watch snapshot file for changes and invoke callbacks on result """
self._stop_event.clear()
self._ready_event.clear()
self._thread = threading.Thread(
target=self._watch,
args=(snapshot_location, environment, callback),
daemon=True,
name="SnapshotWatcher"
)
self._thread.start()
self._ready_event.wait()

def unwatch_snapshot(self) -> None:
""" Stop watching the snapshot file """
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=5.0)
self._thread = None

def _watch(self, snapshot_location: str, environment: str, callback: dict) -> None:
snapshot_file = f"{snapshot_location}/{environment}.json"
last_mtime = self._get_mtime(snapshot_file)
self._ready_event.set()

while not self._stop_event.is_set():
self._stop_event.wait(_POLL_INTERVAL)
current_mtime = self._get_mtime(snapshot_file)
if current_mtime != last_mtime:
last_mtime = current_mtime
self._on_modify_snapshot(snapshot_location, environment, callback)

def _get_mtime(self, snapshot_file: str) -> float:
return os.stat(snapshot_file).st_mtime

def _on_modify_snapshot(self, snapshot_location: str, environment: str, callback: dict) -> None:
success = callback.get('success', lambda: None)
reject = callback.get('reject', lambda _: None)

try:
snapshot = load_domain(snapshot_location, environment)
GlobalSnapshot.init(snapshot)
success()
except Exception as error:
reject(error)
33 changes: 27 additions & 6 deletions tests/playground/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ def setup_context(options: ContextOptions = ContextOptions(), environment = DEFA
options=options
)

def simple_api_call():
# Use cases

def uc_simple_api_call():
""" Use case: Check Switcher using remote API """
setup_context(ContextOptions(
local=False
Expand All @@ -30,7 +32,7 @@ def simple_api_call():
monitor_thread = threading.Thread(target=monitor_run, args=(switcher,), daemon=True)
monitor_thread.start()

def simple_api_call_with_throttle():
def uc_simple_api_call_with_throttle():
""" Use case: Check Switcher using remote API with throttle """
setup_context(ContextOptions(
local=False
Expand All @@ -42,7 +44,7 @@ def simple_api_call_with_throttle():
monitor_thread = threading.Thread(target=monitor_run, args=(switcher,True), daemon=True)
monitor_thread.start()

def load_snapshot_from_remote():
def uc_load_snapshot_from_remote():
""" Use case: Load snapshot from remote API """
global LOOP
LOOP = False
Expand All @@ -58,7 +60,7 @@ def load_snapshot_from_remote():

print(f"Snapshot version: {Client.snapshot_version()}")

def auto_update_snapshot():
def uc_auto_update_snapshot():
""" Use case: Auto update snapshot """
setup_context(ContextOptions(
local=True,
Expand All @@ -77,7 +79,7 @@ def auto_update_snapshot():
)
)

def check_switchers():
def uc_check_switchers():
""" Use case: Check switchers """
global LOOP
LOOP = False
Expand All @@ -89,9 +91,28 @@ def check_switchers():
except Exception as e:
print(f"❌ Configuration error: {e}")

def uc_watch_snapshot():
""" Use case: Watch snapshot file for changes """
setup_context(
environment='default_load_1',
options=ContextOptions(
local=True,
snapshot_location='../../tests/snapshots'
))

Client.load_snapshot()
Client.watch_snapshot({
'success': lambda: print("✅ Snapshot loaded successfully"),
'reject': lambda e: print(f"❌ Error loading snapshot: {e}")
})

switcher = Client.get_switcher('FF2FOR2030')
monitor_thread = threading.Thread(target=monitor_run, args=(switcher,True), daemon=True)
monitor_thread.start()

try:
# Replace with use case
simple_api_call()
uc_simple_api_call()
while LOOP:
time.sleep(1)
except KeyboardInterrupt:
Expand Down
8 changes: 8 additions & 0 deletions tests/snapshots/default_malformed.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"domain": {
"name": "Business",
"description": "Business description",
"version": 1,
"activated": true//
}
}
127 changes: 127 additions & 0 deletions tests/test_client_watch_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
import shutil
import time

from typing import Optional

from switcher_client.client import Client, ContextOptions
from switcher_client.lib.globals.global_context import DEFAULT_ENVIRONMENT
from switcher_client.lib.snapshot_watcher import SnapshotWatcher

class TestClientWatchSnapshot:
""" Test suite for Client.watch_snapshot """

def setup_method(self):
self.async_success = None
self.async_error = None
Client._snapshot_watcher = SnapshotWatcher()

def teardown_method(self):
Client.unwatch_snapshot()

def teardown_class(self):
temp_folder = 'tests/snapshots/temp'
if os.path.exists(temp_folder):
shutil.rmtree(temp_folder)

def test_watch_snapshot(self):
""" Should watch the snapshot file and update the switcher when the file changes """

fixture_env = 'default_load_1'
fixture_env_file_modified = 'tests/snapshots/default_load_2.json'
fixture_location = 'tests/snapshots/temp'

# given
modify_fixture_snapshot(fixture_location, fixture_env, f'tests/snapshots/{fixture_env}.json')
given_context(snapshot_location=fixture_location, environment=fixture_env)
Client.load_snapshot()

# test
switcher = Client.get_switcher('FF2FOR2030')
Client.watch_snapshot({
'success': lambda: setattr(self, 'async_success', True),
'reject': lambda err: setattr(self, 'async_error', err)
})

assert switcher.is_on()
modify_fixture_snapshot(fixture_location, fixture_env, fixture_env_file_modified)

# then
verified = verify_util(30, lambda: self.async_success is True and self.async_error is None)
if verified:
assert switcher.is_on() == False
assert self.async_error is None
else:
print("Warning: Snapshot watcher did not detect the change within the time limit")

def test_watch_snapshot_err_no_snapshot_location(self):
""" Should reject with error when snapshot location is not defined in the context options """

# given
given_context()

# test
Client.watch_snapshot({
'success': lambda: setattr(self, 'async_success', True),
'reject': lambda err: setattr(self, 'async_error', err)
})

# then
assert self.async_success is None
assert str(self.async_error) == 'Snapshot location is not defined in the context options'

def test_watch_snapshot_err_malformed_snapshot(self):
""" Should reject with error when snapshot file is malformed """

fixture_env = 'default_load_1'
fixture_env_file_modified = 'tests/snapshots/default_malformed.json'
fixture_location = 'tests/snapshots/temp'

# given
modify_fixture_snapshot(fixture_location, fixture_env, f'tests/snapshots/{fixture_env}.json')
given_context(snapshot_location=fixture_location, environment=fixture_env)
Client.load_snapshot()

# test
Client.watch_snapshot({
'success': lambda: setattr(self, 'async_success', True),
'reject': lambda err: setattr(self, 'async_error', err)
})

modify_fixture_snapshot(fixture_location, fixture_env, fixture_env_file_modified)

# then
verified = verify_util(30, lambda: self.async_error is not None)
if verified:
assert str(self.async_error) == "Expecting ',' delimiter: line 6 column 26 (char 140)"
else:
print("Warning: Snapshot watcher did not detect the change within the time limit")

# Helpers

def given_context(environment: str = DEFAULT_ENVIRONMENT, snapshot_location: Optional[str] = None):
Client.build_context(
domain='Playground',
environment=environment,
options=ContextOptions(
local=True,
snapshot_location=snapshot_location
)
)

def modify_fixture_snapshot(location: str, environment: str, fixture_modified_location: str):
with open(fixture_modified_location, 'r') as file:
data = file.read()

os.makedirs(location, exist_ok=True)
snapshot_file = f"{location}/{environment}.json"
with open(snapshot_file, 'w') as file:
file.write(data)

def verify_util(max_time: int, condition_fn) -> bool:
start_time = time.time()
while time.time() - start_time < max_time:
if condition_fn():
return True
time.sleep(1)
return False