Feat: partial refactoring, issues detected by pyscn#266
Feat: partial refactoring, issues detected by pyscn#266
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughConsolidates execution fields into an ExecutionSpec mixin; replaces the Redis pub/sub SSERedisBus with a Redis Streams-based SSEService and updates providers/tests/handlers; centralizes worker startup with a new run_worker bootstrap used by run_* entrypoints; removes SagaInstance and trims exported user-settings docs/exports; adds/relocates validators and schema fields. Changes
Sequence Diagram(s)sequenceDiagram
participant Main as Worker main()
participant Runner as run_worker()
participant Settings as Settings
participant Beanie as Beanie (Mongo)
participant Container as DI Container
participant Broker as KafkaBroker
participant Startup as on_startup hook
participant FastStream as FastStream
participant Shutdown as on_shutdown hook
Main->>Runner: call run_worker(name, config_override, factory, register_handlers?, hooks?)
Runner->>Settings: load configuration
Runner->>Beanie: initialize Beanie / ALL_DOCUMENTS
Runner->>Container: create DI container via factory
Container->>Broker: resolve KafkaBroker
Runner->>Runner: optionally call register_handlers(container, broker)
Runner->>Startup: invoke on_startup(container, broker, logger)
Startup->>Container: resolve services and schedule jobs
Runner->>FastStream: configure & start FastStream with lifecycle hooks
FastStream->>Shutdown: invoke on_shutdown() on termination
Shutdown->>Container: shutdown schedulers and cleanup
Runner->>Container: close container
Runner->>Main: return / exit
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Codecov Report❌ Patch coverage is
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 12 files with indirect coverage changes
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
3 issues found across 23 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/workers/bootstrap.py">
<violation number="1" location="backend/workers/bootstrap.py:54">
P2: AsyncMongoClient is created but never closed; add a shutdown hook to close the client so worker shutdowns release Mongo connections cleanly.</violation>
</file>
<file name="backend/app/schemas_pydantic/user_settings.py">
<violation number="1" location="backend/app/schemas_pydantic/user_settings.py:4">
P1: Validation constraints for editor settings were removed without replacement, allowing invalid `font_size`/`tab_size` values to be accepted.</violation>
</file>
<file name="backend/workers/run_saga_orchestrator.py">
<violation number="1" location="backend/workers/run_saga_orchestrator.py:11">
P2: Initialize AsyncIOScheduler inside the running event loop. Creating it at module import time happens before `asyncio.run()` builds the loop, which APScheduler documents can prevent the scheduler from running. Move scheduler creation into `_on_startup` (or pass the current loop) instead of a module-level singleton.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Pull request overview
This PR introduces a shared worker bootstrap (run_worker) to centralize common initialization (settings/logging/Beanie/Dishka/FastStream), updates worker entrypoints and architecture docs accordingly, and includes a few related refactors (event field mixins, settings validation, plotting helper extraction).
Changes:
- Added
backend/workers/bootstrap.pyand migrated all workerrun_*.pyentrypoints to userun_worker(). - Refactored execution/pod command event models to reuse a shared
ExecutionSpecmixin; updated docs to describe the pattern. - Adjusted validation/refactors in services and utilities (user settings editor validation, notification helper methods, event replay helpers, load-test plotting helper).
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/components/workers/index.md | Documents shared worker bootstrap location and lifecycle link. |
| docs/architecture/services-overview.md | Updates deployed worker overview to reflect shared bootstrap. |
| docs/architecture/lifecycle.md | Rewrites worker entrypoint/lifecycle guidance to match run_worker() pattern. |
| docs/architecture/event-system-design.md | Documents shared Pydantic mixin approach for event fields. |
| backend/workers/run_saga_orchestrator.py | Migrates saga orchestrator worker entrypoint to run_worker() + hooks. |
| backend/workers/run_result_processor.py | Migrates result processor worker entrypoint to run_worker(). |
| backend/workers/run_pod_monitor.py | Migrates pod monitor worker entrypoint to run_worker() + APScheduler startup/shutdown hooks. |
| backend/workers/run_k8s_worker.py | Migrates k8s worker entrypoint to run_worker() + startup hook. |
| backend/workers/run_event_replay.py | Migrates event replay worker entrypoint to run_worker() + APScheduler hooks. |
| backend/workers/run_dlq_processor.py | Migrates DLQ processor entrypoint to run_worker() + APScheduler hooks. |
| backend/workers/bootstrap.py | New shared bootstrap that initializes logging, Beanie, Dishka+FastStream, and worker hooks. |
| backend/tests/load/plot_report.py | Extracts shared bar chart saving logic to a helper. |
| backend/app/services/user_settings_service.py | Adds service-layer validation for editor settings (font/tab sizes). |
| backend/app/services/saga/init.py | Removes re-export of deleted SagaInstance. |
| backend/app/services/notification_service.py | Extracts scheduled-time validation and throttle check into helpers. |
| backend/app/services/event_replay/replay_service.py | Extracts helper methods to simplify replay dispatch control flow. |
| backend/app/schemas_pydantic/user_settings.py | Removes Pydantic field validators for editor settings in API schemas. |
| backend/app/domain/saga/models.py | Deletes unused SagaInstance dataclass. |
| backend/app/domain/saga/init.py | Removes SagaInstance from exports. |
| backend/app/domain/events/typed.py | Introduces ExecutionSpec mixin and applies it to relevant events. |
| backend/app/db/docs/user_settings.py | Removes embedded-document validators and simplifies docstrings. |
| backend/app/db/docs/saga.py | Simplifies SagaDocument docstring after SagaInstance removal. |
| backend/app/db/docs/init.py | Stops exporting embedded settings models from docs package. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/domain/events/typed.py (1)
61-83:⚠️ Potential issue | 🟡 MinorRemove banner/separator comments from this module.
Line 61 and Line 82 add
# --- ... ---separators, which violate the repo’s Python style rule.Suggested cleanup
-# --- Execution Spec (shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) --- - class ExecutionSpec(BaseModel): """Shared execution specification fields (mixin for ExecutionRequestedEvent and CreatePodCommandEvent).""" @@ -# --- Execution Events --- - class ExecutionRequestedEvent(BaseEvent, ExecutionSpec):As per coding guidelines: Never use banner/separator comments (
# ---,# ===,# ***) — use classes, blank lines, and docstrings to organize code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/domain/events/typed.py` around lines 61 - 83, Remove the banner/separator comments surrounding the ExecutionSpec declaration: delete the lines containing "# --- Execution Spec (shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) ---" and "# --- Execution Events ---" and instead rely on the ExecutionSpec class docstring, a single blank line separation, or nearby class definitions (e.g., ExecutionRequestedEvent, CreatePodCommandEvent) to structure the module; ensure no other banner-style comments remain in typed.py and keep the ExecutionSpec class and its fields unchanged.
🧹 Nitpick comments (3)
backend/app/schemas_pydantic/user_settings.py (1)
21-30: Consider the trade-offs of removing Pydantic validators.Moving validation from Pydantic to the service layer (
_validate_editor_settingsinuser_settings_service.py) is a valid architectural choice, but note:
- OpenAPI schema impact: The generated API docs will no longer show
font_size(8-32) andtab_size(2, 4, 8) constraints, potentially affecting API consumers.- Error response timing: Validation errors now occur deeper in the request lifecycle rather than at deserialization.
If this is intentional for centralized validation control, consider adding
Field(ge=8, le=32)forfont_sizeandLiteral[2, 4, 8]fortab_sizeto preserve OpenAPI documentation while keeping service-level validation as the enforcement layer.📝 Optional: Restore constraints for OpenAPI documentation
+from typing import Literal + class EditorSettings(BaseModel): """Code editor preferences""" model_config = ConfigDict(from_attributes=True, json_schema_serialization_defaults_required=True) - font_size: int = 14 - tab_size: int = 4 + font_size: int = Field(default=14, ge=8, le=32) + tab_size: Literal[2, 4, 8] = 4 use_tabs: bool = False word_wrap: bool = True show_line_numbers: bool = True🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/schemas_pydantic/user_settings.py` around lines 21 - 30, Add explicit Pydantic constraints back to the EditorSettings model to preserve OpenAPI docs: change the font_size and tab_size annotations in EditorSettings to use Field(ge=8, le=32) for font_size and typing.Literal[2, 4, 8] for tab_size so the schema reflects the allowed ranges/options, while still keeping the deeper enforcement in _validate_editor_settings in user_settings_service.py; this keeps documentation accurate without removing your centralized service-layer validation.backend/workers/bootstrap.py (1)
51-51: Use structured log kwargs instead of f-strings.These logs should pass
worker_nameas a keyword field rather than interpolating into the message string.As per coding guidelines: "Use keyword arguments for all log statements — never interpolate user-controlled data into log messages."Proposed fix
- logger.info(f"Starting {worker_name}...") + logger.info("Starting worker", worker_name=worker_name) ... - logger.info(f"{worker_name} shutdown complete") + logger.info("Worker shutdown complete", worker_name=worker_name)Also applies to: 81-81
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workers/bootstrap.py` at line 51, Replace the f-string interpolation in the logger calls with structured keyword fields: instead of logger.info(f"Starting {worker_name}...") call logger.info("Starting worker", worker_name=worker_name). Do the same for the other f-string log at the other logger.info occurrence (the second logger.info that interpolates worker_name): pass worker_name as a keyword argument rather than embedding it in the message.docs/architecture/event-system-design.md (1)
64-65: Clarify “optional” wording forruntime_command.The current sentence can be read as allowing
None, but the field islist[str]with a default factory. Consider saying it is “omittable at construction” rather than “optional”.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/architecture/event-system-design.md` around lines 64 - 65, The phrase saying `CreatePodCommandEvent` makes `runtime_command` "optional" is ambiguous; update the wording to indicate the field is a list[str] with a default factory and therefore "omittable at construction" (or "has a default empty list via Field(default_factory=list)") rather than implying it can be None; reference `CreatePodCommandEvent`, the `runtime_command` field, and `Field(default_factory=list)` when rewriting the sentence alongside the existing note that mixins inherit from `BaseModel` not `BaseEvent`.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/domain/events/typed.py`:
- Around line 424-427: The CreatePodCommandEvent currently sets runtime_command
with Field(default_factory=list), making it silently optional; change the
declaration in the CreatePodCommandEvent class to require runtime_command
(remove Field and default_factory so runtime_command: list[str] is mandatory) to
enforce the invariant that commands are provided (class name:
CreatePodCommandEvent, field: runtime_command). Ensure any callers (e.g., where
CreatePodCommandEvent is constructed in the saga) already pass runtime_command;
update tests/constructors if they relied on the empty-list default.
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 198-200: ReplayConfig.speed_multiplier is unconstrained and can be
zero or negative causing ZeroDivisionError in the replay delay calculation;
update the ReplayConfig model (ReplayConfig.speed_multiplier) to enforce gt=0
(e.g., use Pydantic Field(..., gt=0)) in models.py and also add a runtime guard
in the replay delay logic (the condition that checks
session.config.speed_multiplier < 100) to require
session.config.speed_multiplier > 0 before dividing so the code that computes
time_diff / session.config.speed_multiplier cannot divide by zero or negative
values.
In `@backend/app/services/notification_service.py`:
- Line 182: create_notification is calling _check_throttle a second time for
system notifications already rate-checked in _create_system_for_user, causing
two entries in ThrottleCache.check_throttle; add a boolean parameter (e.g.,
throttle_already_checked or skip_internal_throttle, default False) to
create_notification and propagate it to avoid calling self._check_throttle when
True, then update _create_system_for_user to call create_notification(...,
throttle_already_checked=True) (or alternatively detect the "system" source and
skip the second _check_throttle), and ensure only ThrottleCache.check_throttle
is invoked once per notification path.
- Around line 144-149: The warning log in the throttling branch currently
interpolates user-controlled user_id into the message using f-strings; change
the self.logger.warning call in the block where `throttled` is True to use
structured keyword arguments instead (e.g., pass a template string without
interpolation and provide user_id, NOTIF_THROTTLE_MAX_PER_HOUR, and
NOTIF_THROTTLE_WINDOW_HOURS as keyword args), updating the call site that
references `self.logger.warning` and
`self.settings.NOTIF_THROTTLE_MAX_PER_HOUR`/`self.settings.NOTIF_THROTTLE_WINDOW_HOURS`
so sensitive data is not directly formatted into the message.
In `@backend/tests/load/plot_report.py`:
- Line 121: The return statement calling _save_bar_chart exceeds the 120-char
limit; break it across multiple lines so arguments and the long path expression
are wrapped (e.g., assign out_dir / "endpoint_latency.png" to a short variable
or use a parenthesized call with each argument on its own line) and then return
_save_bar_chart(labels, "Success Latency by Endpoint (Top N)", "Latency (ms)",
path_var, bars) so the line lengths stay under 120 characters and references to
_save_bar_chart, labels, bars, and out_dir remain clear.
- Around line 81-100: The helper function _save_bar_chart lacks a Google-style
docstring; add a docstring above the function using Google style that describes
the function purpose and documents Args (labels: List[str], title: str, ylabel:
str, out_path: Path, plot_bars: Callable[[Axes, range], None]), Returns (Path)
and any Raises (e.g., propagation of matplotlib/IO errors if applicable),
include brief descriptions and types for each parameter and the return value,
and keep it concise to match existing backend docstring conventions.
- Around line 6-7: Remove List, Dict, and Tuple from the typing import and
import only TYPE_CHECKING, Callable, TypedDict; then replace all usages of
typing collection aliases with PEP 585 built-ins: change List[...] to list[...],
Dict[...] to dict[...], and Tuple[...] to tuple[...] in the TypedDict
definitions (e.g., fields on EndpointData and any PlotReport TypedDict), in the
local variable annotations inside the functions referenced around lines 51–53,
the return type at line 74 and line 143, the variable annotation at line 75, and
the parameter annotation at line 82 so they use list[int], dict[str,
EndpointData], tuple[...], etc., matching the original types but using built-in
generics; keep Callable and TypedDict imports as needed.
In `@backend/workers/bootstrap.py`:
- Around line 54-80: The AsyncMongoClient instance named client is never closed;
wrap the lifecycle around app.run so client.close() is invoked on shutdown:
create the client via AsyncMongoClient(...) before init_beanie as now, but
ensure you register its close in the worker shutdown path (either add an async
lambda calling client.close() to shutdown_hooks or surround the app.run() call
with a try/finally that calls await client.close()), keeping init_beanie and
container setup intact; reference AsyncMongoClient/client, init_beanie,
shutdown_hooks (or finally after await app.run()), and container.close to locate
where to add the cleanup.
In `@backend/workers/run_pod_monitor.py`:
- Around line 41-48: The scheduler interval is set to 5s while the watch cycle
(watch_pod_events / _watch_cycle) can block up to watch_timeout_seconds (30s),
causing skipped runs due to max_instances=1; update the _scheduler.add_job call
(id "pod_monitor_watch", function _watch_cycle) to use an interval equal to the
configured watch_timeout_seconds (or otherwise align it with that config value)
so the schedule reflects the actual blocking duration, or alternatively increase
max_instances and handle concurrency—prefer changing seconds from 5 to the
watch_timeout_seconds config to keep behavior predictable.
In `@backend/workers/run_saga_orchestrator.py`:
- Around line 38-39: The shutdown handler _on_shutdown currently calls
_scheduler.shutdown(wait=False) unconditionally which can raise
SchedulerNotRunningError if _on_startup failed before _scheduler.start(); modify
_on_shutdown to check the scheduler's running property (e.g., if
_scheduler.running:) before calling _scheduler.shutdown(...) so shutdown is only
invoked when the scheduler is running, avoiding the exception during application
shutdown.
In `@docs/architecture/lifecycle.md`:
- Around line 100-104: Checklist items 3 and 4 repeat "Optionally" — replace the
two separate "Optionally" phrases with a single, concise sentence covering both
actions; for example, change "Optionally register Kafka subscriber handlers" and
"Optionally define `on_startup` / `on_shutdown` callbacks for APScheduler or
other setup" into a single line like "Register Kafka subscriber handlers and/or
define `on_startup` / `on_shutdown` callbacks for APScheduler or other setup as
needed" so the checklist reads smoother and avoids repetition.
---
Outside diff comments:
In `@backend/app/domain/events/typed.py`:
- Around line 61-83: Remove the banner/separator comments surrounding the
ExecutionSpec declaration: delete the lines containing "# --- Execution Spec
(shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) ---"
and "# --- Execution Events ---" and instead rely on the ExecutionSpec class
docstring, a single blank line separation, or nearby class definitions (e.g.,
ExecutionRequestedEvent, CreatePodCommandEvent) to structure the module; ensure
no other banner-style comments remain in typed.py and keep the ExecutionSpec
class and its fields unchanged.
---
Nitpick comments:
In `@backend/app/schemas_pydantic/user_settings.py`:
- Around line 21-30: Add explicit Pydantic constraints back to the
EditorSettings model to preserve OpenAPI docs: change the font_size and tab_size
annotations in EditorSettings to use Field(ge=8, le=32) for font_size and
typing.Literal[2, 4, 8] for tab_size so the schema reflects the allowed
ranges/options, while still keeping the deeper enforcement in
_validate_editor_settings in user_settings_service.py; this keeps documentation
accurate without removing your centralized service-layer validation.
In `@backend/workers/bootstrap.py`:
- Line 51: Replace the f-string interpolation in the logger calls with
structured keyword fields: instead of logger.info(f"Starting {worker_name}...")
call logger.info("Starting worker", worker_name=worker_name). Do the same for
the other f-string log at the other logger.info occurrence (the second
logger.info that interpolates worker_name): pass worker_name as a keyword
argument rather than embedding it in the message.
In `@docs/architecture/event-system-design.md`:
- Around line 64-65: The phrase saying `CreatePodCommandEvent` makes
`runtime_command` "optional" is ambiguous; update the wording to indicate the
field is a list[str] with a default factory and therefore "omittable at
construction" (or "has a default empty list via Field(default_factory=list)")
rather than implying it can be None; reference `CreatePodCommandEvent`, the
`runtime_command` field, and `Field(default_factory=list)` when rewriting the
sentence alongside the existing note that mixins inherit from `BaseModel` not
`BaseEvent`.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 60ea9b8f-1cfa-41e5-8e3d-2244b2b32807
📒 Files selected for processing (23)
backend/app/db/docs/__init__.pybackend/app/db/docs/saga.pybackend/app/db/docs/user_settings.pybackend/app/domain/events/typed.pybackend/app/domain/saga/__init__.pybackend/app/domain/saga/models.pybackend/app/schemas_pydantic/user_settings.pybackend/app/services/event_replay/replay_service.pybackend/app/services/notification_service.pybackend/app/services/saga/__init__.pybackend/app/services/user_settings_service.pybackend/tests/load/plot_report.pybackend/workers/bootstrap.pybackend/workers/run_dlq_processor.pybackend/workers/run_event_replay.pybackend/workers/run_k8s_worker.pybackend/workers/run_pod_monitor.pybackend/workers/run_result_processor.pybackend/workers/run_saga_orchestrator.pydocs/architecture/event-system-design.mddocs/architecture/lifecycle.mddocs/architecture/services-overview.mddocs/components/workers/index.md
💤 Files with no reviewable changes (3)
- backend/app/domain/saga/models.py
- backend/app/domain/saga/init.py
- backend/app/db/docs/init.py
There was a problem hiding this comment.
🧹 Nitpick comments (1)
frontend/vitest.setup.ts (1)
101-106: Consider movingcleanup()toafterEachand addingvi.clearAllMocks().Per project guidelines,
cleanup()should be called inafterEach(notbeforeEach) to immediately release DOM resources after each test. Additionally,vi.clearAllMocks()should be called inbeforeEachto reset mock state between tests.♻️ Suggested refactor
+import { vi, beforeEach, afterEach } from 'vitest'; -import { vi, beforeEach } from 'vitest';// Reset storage and DOM between every test beforeEach(() => { Object.keys(localStorageStore).forEach(key => delete localStorageStore[key]); Object.keys(sessionStorageStore).forEach(key => delete sessionStorageStore[key]); - cleanup(); + vi.clearAllMocks(); +}); + +afterEach(() => { + cleanup(); });Based on learnings: "Call
cleanup()in VitestafterEach,vi.clearAllMocks()inbeforeEach".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/vitest.setup.ts` around lines 101 - 106, Move the DOM cleanup to an afterEach and add mock clearing in beforeEach: keep the storage reset logic in beforeEach (iterating localStorageStore and sessionStorageStore) but call vi.clearAllMocks() at the start of that beforeEach; remove cleanup() from beforeEach and call cleanup() inside an afterEach to release DOM resources immediately after each test. Ensure you reference the existing beforeEach/afterEach hooks and the cleanup(), vi.clearAllMocks(), localStorageStore and sessionStorageStore identifiers when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@frontend/vitest.setup.ts`:
- Around line 101-106: Move the DOM cleanup to an afterEach and add mock
clearing in beforeEach: keep the storage reset logic in beforeEach (iterating
localStorageStore and sessionStorageStore) but call vi.clearAllMocks() at the
start of that beforeEach; remove cleanup() from beforeEach and call cleanup()
inside an afterEach to release DOM resources immediately after each test. Ensure
you reference the existing beforeEach/afterEach hooks and the cleanup(),
vi.clearAllMocks(), localStorageStore and sessionStorageStore identifiers when
making the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e7a422e7-ef0a-4d97-ab59-bc427549dccd
📒 Files selected for processing (2)
frontend/vitest.config.tsfrontend/vitest.setup.ts
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
frontend/src/lib/__tests__/user-settings.test.ts (1)
6-33:⚠️ Potential issue | 🟠 MajorUse
vi.hoisted()to hoist mock declarations beforevi.mock()calls.The mock functions defined at lines 3–4, 13, 19, and 25 are declared outside
vi.hoisted()and used immediately invi.mock()callbacks starting at line 6. This violates the test standard requiring all mocks to be hoisted before anyvi.mock()declarations.Consolidate all mock definitions into a single
vi.hoisted()block at the top of the file before anyvi.mock()calls:Suggested refactor
+const { mockGetUserSettings, mockUpdateUserSettings, mockSetUserSettings, mockSetTheme, mockAuthStore } = + vi.hoisted(() => ({ + mockGetUserSettings: vi.fn(), + mockUpdateUserSettings: vi.fn(), + mockSetUserSettings: vi.fn(), + mockSetTheme: vi.fn(), + mockAuthStore: { isAuthenticated: true as boolean | null }, + })); + -const mockGetUserSettings = vi.fn(); -const mockUpdateUserSettings = vi.fn(); - vi.mock('$lib/api', () => ({ getUserSettingsApiV1UserSettingsGet: (...args: unknown[]) => (mockGetUserSettings as (...a: unknown[]) => unknown)(...args), updateUserSettingsApiV1UserSettingsPut: (...args: unknown[]) => (mockUpdateUserSettings as (...a: unknown[]) => unknown)(...args), })); - -const mockSetUserSettings = vi.fn(); vi.mock('$stores/userSettings.svelte', () => ({ setUserSettings: (settings: unknown) => mockSetUserSettings(settings), })); - -const mockSetTheme = vi.fn(); vi.mock('$stores/theme.svelte', () => ({ setTheme: (theme: string) => mockSetTheme(theme), })); - -const mockAuthStore = { - isAuthenticated: true as boolean | null, -}; vi.mock('$stores/auth.svelte', () => ({ authStore: mockAuthStore, }));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/lib/__tests__/user-settings.test.ts` around lines 6 - 33, Move the standalone mock declarations (mockGetUserSettings, mockUpdateUserSettings, mockSetUserSettings, mockSetTheme, and mockAuthStore) into a single vi.hoisted() block placed before any vi.mock() calls so they are hoisted; then update the vi.mock() callbacks (the ones creating getUserSettingsApiV1UserSettingsGet, updateUserSettingsApiV1UserSettingsPut, setUserSettings, setTheme, and authStore) to reference those hoisted mocks instead of locally-scoped vars. Ensure vi.hoisted() returns the mock functions/objects and keep the vi.mock() callbacks unchanged except for using the hoisted identifiers.frontend/src/stores/__tests__/theme.test.ts (1)
4-12:⚠️ Potential issue | 🟠 MajorWrap mocks in
vi.hoisted()beforevi.mock()declarations.Lines 4-12 require hoisting mocks following the established pattern used throughout the test suite. Refactor to:
Suggested structure
const mocks = vi.hoisted(() => ({ saveUserSettings: vi.fn().mockResolvedValue(true), authStore: { isAuthenticated: false, }, })); vi.mock('$lib/user-settings', () => ({ saveUserSettings: mocks.saveUserSettings, })); vi.mock('$stores/auth.svelte', () => ({ authStore: mocks.authStore, }));Per coding guidelines: "Frontend Vitest tests must hoist mocks with
vi.hoisted()beforevi.mock()declarations."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/stores/__tests__/theme.test.ts` around lines 4 - 12, Refactor the top-level mocks so they are created with vi.hoisted() and then referenced in the vi.mock() calls: create a hoisted mocks object (via vi.hoisted(() => ({ saveUserSettings: vi.fn().mockResolvedValue(true), authStore: { isAuthenticated: false } }))) and update the existing vi.mock('$lib/user-settings', ...) to return mocks.saveUserSettings and vi.mock('$stores/auth.svelte', ...) to return mocks.authStore; this ensures saveUserSettings and authStore are hoisted before the vi.mock() declarations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@frontend/src/stores/__tests__/auth.test.ts`:
- Around line 8-26: The three top-level mock functions (mockLoginApi,
mockLogoutApi, mockGetProfileApi), mockClearUserSettings, and
mockLoadUserSettings must be created with vi.hoisted so they are defined before
the vi.mock factories; replace the existing const declarations with hoisted
destructuring assignments (e.g., const { mockLoginApi, mockLogoutApi,
mockGetProfileApi } = vi.hoisted(() => ({ mockLoginApi: vi.fn(), ... }))) and
keep the vi.mock('$lib/api'), vi.mock('$stores/userSettings.svelte') and
vi.mock('$lib/user-settings') factories using those hoisted symbols
(loginApiV1AuthLoginPost calling mockLoginApi, clearUserSettings calling
mockClearUserSettings, loadUserSettings calling mockLoadUserSettings) so the
mocks are evaluated in the required order.
---
Outside diff comments:
In `@frontend/src/lib/__tests__/user-settings.test.ts`:
- Around line 6-33: Move the standalone mock declarations (mockGetUserSettings,
mockUpdateUserSettings, mockSetUserSettings, mockSetTheme, and mockAuthStore)
into a single vi.hoisted() block placed before any vi.mock() calls so they are
hoisted; then update the vi.mock() callbacks (the ones creating
getUserSettingsApiV1UserSettingsGet, updateUserSettingsApiV1UserSettingsPut,
setUserSettings, setTheme, and authStore) to reference those hoisted mocks
instead of locally-scoped vars. Ensure vi.hoisted() returns the mock
functions/objects and keep the vi.mock() callbacks unchanged except for using
the hoisted identifiers.
In `@frontend/src/stores/__tests__/theme.test.ts`:
- Around line 4-12: Refactor the top-level mocks so they are created with
vi.hoisted() and then referenced in the vi.mock() calls: create a hoisted mocks
object (via vi.hoisted(() => ({ saveUserSettings:
vi.fn().mockResolvedValue(true), authStore: { isAuthenticated: false } }))) and
update the existing vi.mock('$lib/user-settings', ...) to return
mocks.saveUserSettings and vi.mock('$stores/auth.svelte', ...) to return
mocks.authStore; this ensures saveUserSettings and authStore are hoisted before
the vi.mock() declarations.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 233e5efe-2de8-4249-aab7-e273180357c8
📒 Files selected for processing (11)
backend/workers/bootstrap.pyfrontend/src/components/__tests__/Header.test.tsfrontend/src/components/__tests__/NotificationCenter.test.tsfrontend/src/lib/__tests__/user-settings.test.tsfrontend/src/routes/admin/__tests__/AdminEvents.test.tsfrontend/src/routes/admin/__tests__/AdminExecutions.test.tsfrontend/src/routes/admin/__tests__/AdminSagas.test.tsfrontend/src/routes/admin/__tests__/AdminSettings.test.tsfrontend/src/routes/admin/__tests__/AdminUsers.test.tsfrontend/src/stores/__tests__/auth.test.tsfrontend/src/stores/__tests__/theme.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/workers/bootstrap.py
…functionality instead of starting timers all the time
There was a problem hiding this comment.
1 issue found across 15 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/tests/e2e/conftest.py">
<violation number="1" location="backend/tests/e2e/conftest.py:101">
P1: `wait_for_notification` can return on stale stream entries, causing false-positive readiness in E2E tests. Track a stream cursor (or start position) so only new messages satisfy the wait.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| async for _message in pubsub.listen(): | ||
| return # first message = notification persisted | ||
| while True: | ||
| if await _read_stream(redis_client, key): |
There was a problem hiding this comment.
P1: wait_for_notification can return on stale stream entries, causing false-positive readiness in E2E tests. Track a stream cursor (or start position) so only new messages satisfy the wait.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/e2e/conftest.py, line 101:
<comment>`wait_for_notification` can return on stale stream entries, causing false-positive readiness in E2E tests. Track a stream cursor (or start position) so only new messages satisfy the wait.</comment>
<file context>
@@ -76,19 +88,19 @@ async def wait_for_notification(
- async for _message in pubsub.listen():
- return # first message = notification persisted
+ while True:
+ if await _read_stream(redis_client, key):
+ return
+ await asyncio.sleep(0.1)
</file context>
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/event_replay/replay_service.py (1)
34-41: 🛠️ Refactor suggestion | 🟠 MajorInject
SettingsinEventReplayServiceconstructor to match service DI contract.This constructor still omits
settings, which diverges from the service injection rule used across backend services.As per coding guidelines: "Services must be constructor-injected with: repo, producer, logger, metrics, settings — never instantiated directly".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/event_replay/replay_service.py` around lines 34 - 41, The EventReplayService __init__ is missing the Settings injection required by our DI contract; add a settings: Settings parameter to the constructor signature for EventReplayService and store it on self (e.g., self.settings = settings), updating any callers/instantiation sites to provide the Settings object; reference the existing symbols ReplayRepository, UnifiedProducer, ReplayMetrics, structlog.stdlib.BoundLogger, SSEService and ensure the constructor now accepts and retains Settings alongside those dependencies.
♻️ Duplicate comments (3)
backend/app/services/notification_service.py (2)
145-149:⚠️ Potential issue | 🟡 MinorUse structured fields in throttle warning logs.
This warning still interpolates
user_iddirectly into the message. Keep the message static and pass fields as kwargs.Based on learnings: "Never log user-controlled data via string interpolation — always use structured keyword args".🔧 Suggested fix
if throttled: self.logger.warning( - f"Notification rate limit exceeded for user {user_id}. " - f"Max {self.settings.NOTIF_THROTTLE_MAX_PER_HOUR} " - f"per {self.settings.NOTIF_THROTTLE_WINDOW_HOURS} hour(s)" + "Notification rate limit exceeded", + user_id=user_id, + max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR, + window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/notification_service.py` around lines 145 - 149, The warning in NotificationService uses string interpolation and logs user-controlled user_id directly; change the self.logger.warning call in notification_service.py (the block that references user_id and self.settings.NOTIF_THROTTLE_MAX_PER_HOUR / NOTIF_THROTTLE_WINDOW_HOURS) to use a static message string and pass dynamic values as structured kwargs (e.g., user_id, max_per_hour, window_hours) instead of embedding them into the message so the logger records fields separately.
300-314:⚠️ Potential issue | 🟠 MajorSystem notifications are still being throttled twice.
Line 301 checks throttle in
_create_system_for_user, andcreate_notificationchecks again on Line 182. One notification can consume two slots and distort throttling outcomes.🔧 Suggested fix
async def create_notification( self, user_id: str, subject: str, body: str, tags: list[str], action_url: str, severity: NotificationSeverity = NotificationSeverity.MEDIUM, channel: NotificationChannel = NotificationChannel.IN_APP, scheduled_for: datetime | None = None, metadata: NotificationContext | None = None, + _skip_throttle: bool = False, ) -> DomainNotification: @@ - await self._check_throttle(user_id, severity, "general") + if not _skip_throttle: + await self._check_throttle(user_id, severity, "general") @@ await self.create_notification( user_id=user_id, subject=title, body=str(base_context.get("message", "Alert")), tags=tags, action_url="/api/v1/notifications", severity=cfg.severity, channel=NotificationChannel.IN_APP, metadata=base_context, + _skip_throttle=True, # Already checked in this path )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/notification_service.py` around lines 300 - 314, The code double-checks throttling in _create_system_for_user (calling _check_throttle) and again inside create_notification, causing two throttle slots to be consumed; fix by consolidating the check: either remove the explicit _check_throttle call in _create_system_for_user, or (preferred) add a flag to create_notification (e.g., skip_throttle or perform_throttle: bool = True) that defaults to performing the throttle check, update create_notification to respect this flag and skip its internal _check_throttle when skip_throttle=True, then call create_notification from _create_system_for_user with the flag set so the throttle is only evaluated once and NotificationThrottledError handling remains correct.backend/app/services/event_replay/replay_service.py (1)
195-200:⚠️ Potential issue | 🔴 CriticalGuard speed multiplier before division in replay delay calculation.
time_diff / session.config.speed_multipliercan raise at runtime when multiplier is0(and negative values should also be rejected).🔧 Suggested fix
def _calculate_replay_delay(self, session: ReplaySessionState) -> float: """Calculate the delay before dispatching the next event based on speed multiplier.""" next_event = self._peek_next_event(session.session_id) - if next_event and session.last_event_at and session.config.speed_multiplier < 100: + multiplier = session.config.speed_multiplier + if next_event and session.last_event_at and 0 < multiplier < 100: time_diff = (next_event.timestamp - session.last_event_at).total_seconds() - return max(time_diff / session.config.speed_multiplier, 0) + return max(time_diff / multiplier, 0) return 0.0Verification: confirm
ReplayConfig.speed_multiplieris constrained (gt=0) or that runtime guards exist before division.#!/bin/bash set -euo pipefail # 1) Check ReplayConfig constraints for speed_multiplier fd -t f "models.py" backend/app/domain --exec rg -n "class ReplayConfig|speed_multiplier|Field\\(|gt=|ge=" {} + # 2) Check replay delay logic guard in service rg -n -C3 "def _calculate_replay_delay|speed_multiplier" backend/app/services/event_replay/replay_service.py🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/event_replay/replay_service.py` around lines 195 - 200, The division by session.config.speed_multiplier in _calculate_replay_delay can raise on zero/negative values; update the method (function _calculate_replay_delay in replay_service.py) to explicitly guard ReplaySessionState.config.speed_multiplier before dividing (e.g., if speed_multiplier <= 0: log or raise a clear error and return 0 or fallback to a safe default) so you never divide by zero; alternatively ensure ReplayConfig.speed_multiplier has a gt=0 constraint, but if you keep runtime protection add the check immediately after obtaining session.config and before using it with _peek_next_event.
🧹 Nitpick comments (9)
frontend/src/lib/admin/stores/eventsStore.svelte.ts (1)
42-43: Replace inline options object types with a namedinterface.Lines 42 and 240 use inline object-shape types; please switch to an
interfaceand reuse it across constructor/factory.As per coding guidelines: "Use
interfacefor object shapes,typefor unions/intersections/aliases in TypeScript".Also applies to: 240-241
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/lib/admin/stores/eventsStore.svelte.ts` around lines 42 - 43, Define a named interface (e.g. EventsStoreOptions) for the options object currently typed inline and replace the inline type in the constructor signature (constructor({ autoRefresh = true }: { autoRefresh?: boolean } = {})) with that interface; then reuse the same EventsStoreOptions interface for the factory/creator function signature referenced around lines 240-241 so both constructor and the factory accept the same typed options. Ensure the interface is exported/visible where both constructor and the factory (e.g., createEventsStore or similar) can reference it.frontend/src/lib/admin/stores/sagasStore.svelte.ts (1)
23-24: Use a shared optionsinterfaceforautoRefresh.Inline object-shape annotations in constructor/factory should be replaced with a named interface for consistency and guideline compliance.
As per coding guidelines: "Use
interfacefor object shapes,typefor unions/intersections/aliases in TypeScript".Also applies to: 82-83
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/lib/admin/stores/sagasStore.svelte.ts` around lines 23 - 24, Define a named interface (e.g., SagasStoreOptions { autoRefresh?: boolean }) and replace the inline object-shape annotation in the constructor signature (currently written as constructor({ autoRefresh = true }: { autoRefresh?: boolean } = {})) with that interface (constructor(options: SagasStoreOptions = { autoRefresh: true }) or equivalent destructuring using SagasStoreOptions); apply the same replacement to the other occurrence referenced (the factory/initializer at lines 82-83) so both use the shared SagasStoreOptions interface for consistency.frontend/src/lib/admin/stores/executionsStore.svelte.ts (1)
26-27: Use a namedinterfacefor store options instead of inline object types.This repeats the same object shape twice and violates the TS style rule for object shapes.
♻️ Suggested refactor
+interface ExecutionsStoreOptions { + autoRefresh?: boolean; +} + class ExecutionsStore { @@ - constructor({ autoRefresh = true }: { autoRefresh?: boolean } = {}) { + constructor({ autoRefresh = true }: ExecutionsStoreOptions = {}) { @@ -export function createExecutionsStore(options?: { autoRefresh?: boolean }): ExecutionsStore { +export function createExecutionsStore(options?: ExecutionsStoreOptions): ExecutionsStore { return new ExecutionsStore(options); }As per coding guidelines: "Use
interfacefor object shapes,typefor unions/intersections/aliases in TypeScript".Also applies to: 85-86
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/lib/admin/stores/executionsStore.svelte.ts` around lines 26 - 27, Create a named interface for the store options (e.g., ExecutionsStoreOptions) and replace the inline object type occurrences with that interface: update the constructor signature (the constructor({ autoRefresh = true }: { autoRefresh?: boolean } = {})) to use the new interface and update the other occurrence around the code referenced (lines 85-86) to accept/annotate the same ExecutionsStoreOptions type; ensure the interface declares autoRefresh?: boolean with the existing default behavior preserved.backend/tests/unit/services/sse/test_sse_service.py (2)
156-180: Test imports private adapter from implementation module.Line 162 uses
_notif_adapterwhich is a module-private adapter. While this is acceptable in unit tests for verifying internal behavior, consider whether the test could use the publicpublish_notificationmethod instead of directly callingxadd.The current approach directly exercises the stream format, which is useful for unit testing but creates coupling to internal structure.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/unit/services/sse/test_sse_service.py` around lines 156 - 180, Test directly uses the private _notif_adapter and xadd to push a stream entry; change the test to use the public publish_notification API instead so it exercises the public contract rather than internal adapter details: create the DomainNotificationSSEPayload (or equivalent public DTO) and call publish_notification (or svc.publish_notification) to write to the stream, then use create_notification_stream and _FakeRedis as before to assert the yielded event; this removes coupling to _notif_adapter and xadd while keeping the same assertions on create_notification_stream.
209-220: Late import inside test function.Line 210 imports
_replay_adapterinside the test function. For consistency with the file's other imports (lines 14-15), consider moving this import to the top of the file.♻️ Move import to file level
from app.services.sse import SSEService -from app.services.sse.sse_service import _exec_adapter, _notif_adapter +from app.services.sse.sse_service import _exec_adapter, _notif_adapter, _replay_adapterThen remove line 210.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/unit/services/sse/test_sse_service.py` around lines 209 - 220, The test has a late inline import of _replay_adapter inside the test function; move the import for _replay_adapter to the module-level imports alongside the other top imports and remove the inline import statement from the test body so the test uses the module-level _replay_adapter import instead.backend/tests/e2e/services/sse/test_partitioned_event_router.py (1)
49-51: Test accesses private_poll_streammethod.Similar to the notification test, this test directly calls
svc._poll_stream()which is a private method. While acceptable for e2e tests that need to verify internal behavior, consider documenting this as intentional coupling for integration testing purposes.The test otherwise correctly validates the publish-to-stream flow.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/e2e/services/sse/test_partitioned_event_router.py` around lines 49 - 51, This test intentionally calls the private method svc._poll_stream(...) to validate internal publish-to-stream behavior; update the test's docstring or add a brief inline comment above the call to explain that accessing the private method is deliberate for end-to-end/integration verification (i.e., intentional coupling), referencing svc._poll_stream and the read-back lines so future maintainers understand why the private API is used instead of a public wrapper.backend/app/services/sse/sse_service.py (2)
72-80: Unbounded polling loop lacks cancellation-safe cleanup.The
_poll_streammethod runs an infinite loop that only terminates when the caller stops iterating. This is correct for SSE streaming, but consider whether there should be any explicit cancellation handling or timeout mechanism to prevent resource leaks if consumers abandon the generator without proper cleanup.The current pattern relies on the caller to properly close the async generator, which should work with
async forand context managers, but worth noting in documentation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/sse/sse_service.py` around lines 72 - 80, The _poll_stream async generator can leak if iteration is abandoned; update _poll_stream to be cancellation-safe by wrapping the polling loop in a try/finally (or try/except asyncio.CancelledError) and perform cleanup in the finally block (e.g., release any connection/stream handles returned by _read_after or notify any owner via an Event/close callback), and optionally accept an optional cancellation token/timeout parameter to break the loop; reference _poll_stream, _read_after, adapter, last_id, and _poll_interval when making the changes so the generator reliably cleans up resources when the caller stops iterating or the task is cancelled.
53-55: Consider adding error handling for Redis operations.The
_xpublishmethod makes two Redis calls (xaddandexpire) without error handling. If Redis is unavailable or experiences issues, this will propagate exceptions to callers. Consider whether these should be wrapped in try/except with logging for resilience, or if propagating the exception is the intended behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/sse/sse_service.py` around lines 53 - 55, Wrap the two Redis calls in _xpublish (self._redis.xadd and self._redis.expire) in a try/except block to catch exceptions (e.g., Exception as e), log the failure including the stream key, _MAXLEN and _STREAM_TTL context and the exception (use the service logger, e.g., self._logger.error(...)), and then re-raise the exception so callers can handle it (or, if you prefer resilience, return gracefully after logging instead); update the _xpublish method to perform this try/except around the xadd/expire sequence referencing _xpublish, _redis, xadd, expire, _MAXLEN and _STREAM_TTL.backend/tests/e2e/notifications/test_notification_sse.py (1)
36-38: Test couples to private SSE service internals.The test directly accesses
sse._poll_stream()(a private method with_prefix) and imports_notif_adapterfrom the implementation module. This creates tight coupling to internal implementation details.Consider using the public
create_notification_stream()method instead, which is the intended public API for consuming notification streams. This would make the test more resilient to internal refactoring.♻️ Suggested approach using public API
- # Read back from stream - gen = sse._poll_stream(f"sse:notif:{user_id}", _notif_adapter) - msg: DomainNotificationSSEPayload = await asyncio.wait_for(gen.__anext__(), timeout=5.0) + # Read back from stream using public API + gen = sse.create_notification_stream(user_id=user_id) + raw = await asyncio.wait_for(gen.__anext__(), timeout=5.0) + # Parse the SSE payload + msg_data = json.loads(raw["data"])This would require importing
jsonand adjusting assertions to work with the dict/JSON structure.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/e2e/notifications/test_notification_sse.py` around lines 36 - 38, The test currently couples to internals by calling sse._poll_stream and importing _notif_adapter; replace that with the public create_notification_stream API: call create_notification_stream(f"sse:notif:{user_id}") to obtain the async generator/stream, import/use the public adapter (or let create_notification_stream provide the correct adapter), await the next event from that generator, parse the payload as JSON/dict (import json if needed) and adjust assertions to examine the resulting dict structure rather than implementation-specific payload objects; update references to _poll_stream and _notif_adapter to use create_notification_stream and the JSON/dict shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/tests/e2e/conftest.py`:
- Around line 40-43: Update the docstring to reflect actual behavior: instead of
saying it "Reads from '0-0' on each iteration," state that the stream read
starts from "0-0" on the first iteration and then advances using the last_id
variable after each message so subsequent reads continue from the most recent
event; reference the loop variable last_id in the docstring to make behavior
explicit.
- Around line 98-103: wait_for_notification currently reads the stream from the
start every loop and thus returns on stale entries; change it to be cursor-aware
like wait_for_sse_event by tracking a last_id cursor (initialize it to the
stream end or fetch the current last id once), pass that cursor into
_read_stream on each iteration so only entries with id > last_id are considered,
update last_id when new entries are returned and return only when a new entry is
observed; reference the wait_for_notification function and the helper
_read_stream to implement this cursor-based loop.
In `@frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts`:
- Around line 43-46: Update the afterEach teardown to call cleanup() for Vitest
hygiene: inside the existing afterEach that currently calls teardown?.() and
vi.clearAllTimers(), also invoke cleanup() (imported from the testing library)
to ensure DOM and test side-effects are fully reset; adjust the
teardown/afterEach block where teardown and vi.clearAllTimers() are referenced
so it calls cleanup() before/after teardown as appropriate.
---
Outside diff comments:
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 34-41: The EventReplayService __init__ is missing the Settings
injection required by our DI contract; add a settings: Settings parameter to the
constructor signature for EventReplayService and store it on self (e.g.,
self.settings = settings), updating any callers/instantiation sites to provide
the Settings object; reference the existing symbols ReplayRepository,
UnifiedProducer, ReplayMetrics, structlog.stdlib.BoundLogger, SSEService and
ensure the constructor now accepts and retains Settings alongside those
dependencies.
---
Duplicate comments:
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 195-200: The division by session.config.speed_multiplier in
_calculate_replay_delay can raise on zero/negative values; update the method
(function _calculate_replay_delay in replay_service.py) to explicitly guard
ReplaySessionState.config.speed_multiplier before dividing (e.g., if
speed_multiplier <= 0: log or raise a clear error and return 0 or fallback to a
safe default) so you never divide by zero; alternatively ensure
ReplayConfig.speed_multiplier has a gt=0 constraint, but if you keep runtime
protection add the check immediately after obtaining session.config and before
using it with _peek_next_event.
In `@backend/app/services/notification_service.py`:
- Around line 145-149: The warning in NotificationService uses string
interpolation and logs user-controlled user_id directly; change the
self.logger.warning call in notification_service.py (the block that references
user_id and self.settings.NOTIF_THROTTLE_MAX_PER_HOUR /
NOTIF_THROTTLE_WINDOW_HOURS) to use a static message string and pass dynamic
values as structured kwargs (e.g., user_id, max_per_hour, window_hours) instead
of embedding them into the message so the logger records fields separately.
- Around line 300-314: The code double-checks throttling in
_create_system_for_user (calling _check_throttle) and again inside
create_notification, causing two throttle slots to be consumed; fix by
consolidating the check: either remove the explicit _check_throttle call in
_create_system_for_user, or (preferred) add a flag to create_notification (e.g.,
skip_throttle or perform_throttle: bool = True) that defaults to performing the
throttle check, update create_notification to respect this flag and skip its
internal _check_throttle when skip_throttle=True, then call create_notification
from _create_system_for_user with the flag set so the throttle is only evaluated
once and NotificationThrottledError handling remains correct.
---
Nitpick comments:
In `@backend/app/services/sse/sse_service.py`:
- Around line 72-80: The _poll_stream async generator can leak if iteration is
abandoned; update _poll_stream to be cancellation-safe by wrapping the polling
loop in a try/finally (or try/except asyncio.CancelledError) and perform cleanup
in the finally block (e.g., release any connection/stream handles returned by
_read_after or notify any owner via an Event/close callback), and optionally
accept an optional cancellation token/timeout parameter to break the loop;
reference _poll_stream, _read_after, adapter, last_id, and _poll_interval when
making the changes so the generator reliably cleans up resources when the caller
stops iterating or the task is cancelled.
- Around line 53-55: Wrap the two Redis calls in _xpublish (self._redis.xadd and
self._redis.expire) in a try/except block to catch exceptions (e.g., Exception
as e), log the failure including the stream key, _MAXLEN and _STREAM_TTL context
and the exception (use the service logger, e.g., self._logger.error(...)), and
then re-raise the exception so callers can handle it (or, if you prefer
resilience, return gracefully after logging instead); update the _xpublish
method to perform this try/except around the xadd/expire sequence referencing
_xpublish, _redis, xadd, expire, _MAXLEN and _STREAM_TTL.
In `@backend/tests/e2e/notifications/test_notification_sse.py`:
- Around line 36-38: The test currently couples to internals by calling
sse._poll_stream and importing _notif_adapter; replace that with the public
create_notification_stream API: call
create_notification_stream(f"sse:notif:{user_id}") to obtain the async
generator/stream, import/use the public adapter (or let
create_notification_stream provide the correct adapter), await the next event
from that generator, parse the payload as JSON/dict (import json if needed) and
adjust assertions to examine the resulting dict structure rather than
implementation-specific payload objects; update references to _poll_stream and
_notif_adapter to use create_notification_stream and the JSON/dict shape.
In `@backend/tests/e2e/services/sse/test_partitioned_event_router.py`:
- Around line 49-51: This test intentionally calls the private method
svc._poll_stream(...) to validate internal publish-to-stream behavior; update
the test's docstring or add a brief inline comment above the call to explain
that accessing the private method is deliberate for end-to-end/integration
verification (i.e., intentional coupling), referencing svc._poll_stream and the
read-back lines so future maintainers understand why the private API is used
instead of a public wrapper.
In `@backend/tests/unit/services/sse/test_sse_service.py`:
- Around line 156-180: Test directly uses the private _notif_adapter and xadd to
push a stream entry; change the test to use the public publish_notification API
instead so it exercises the public contract rather than internal adapter
details: create the DomainNotificationSSEPayload (or equivalent public DTO) and
call publish_notification (or svc.publish_notification) to write to the stream,
then use create_notification_stream and _FakeRedis as before to assert the
yielded event; this removes coupling to _notif_adapter and xadd while keeping
the same assertions on create_notification_stream.
- Around line 209-220: The test has a late inline import of _replay_adapter
inside the test function; move the import for _replay_adapter to the
module-level imports alongside the other top imports and remove the inline
import statement from the test body so the test uses the module-level
_replay_adapter import instead.
In `@frontend/src/lib/admin/stores/eventsStore.svelte.ts`:
- Around line 42-43: Define a named interface (e.g. EventsStoreOptions) for the
options object currently typed inline and replace the inline type in the
constructor signature (constructor({ autoRefresh = true }: { autoRefresh?:
boolean } = {})) with that interface; then reuse the same EventsStoreOptions
interface for the factory/creator function signature referenced around lines
240-241 so both constructor and the factory accept the same typed options.
Ensure the interface is exported/visible where both constructor and the factory
(e.g., createEventsStore or similar) can reference it.
In `@frontend/src/lib/admin/stores/executionsStore.svelte.ts`:
- Around line 26-27: Create a named interface for the store options (e.g.,
ExecutionsStoreOptions) and replace the inline object type occurrences with that
interface: update the constructor signature (the constructor({ autoRefresh =
true }: { autoRefresh?: boolean } = {})) to use the new interface and update the
other occurrence around the code referenced (lines 85-86) to accept/annotate the
same ExecutionsStoreOptions type; ensure the interface declares autoRefresh?:
boolean with the existing default behavior preserved.
In `@frontend/src/lib/admin/stores/sagasStore.svelte.ts`:
- Around line 23-24: Define a named interface (e.g., SagasStoreOptions {
autoRefresh?: boolean }) and replace the inline object-shape annotation in the
constructor signature (currently written as constructor({ autoRefresh = true }:
{ autoRefresh?: boolean } = {})) with that interface (constructor(options:
SagasStoreOptions = { autoRefresh: true }) or equivalent destructuring using
SagasStoreOptions); apply the same replacement to the other occurrence
referenced (the factory/initializer at lines 82-83) so both use the shared
SagasStoreOptions interface for consistency.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 45975b93-5730-4a2e-8097-cbcf155bc2f5
📒 Files selected for processing (21)
backend/app/core/providers.pybackend/app/events/handlers.pybackend/app/services/event_replay/replay_service.pybackend/app/services/notification_service.pybackend/app/services/sse/__init__.pybackend/app/services/sse/redis_bus.pybackend/app/services/sse/sse_service.pybackend/tests/e2e/conftest.pybackend/tests/e2e/core/test_dishka_lifespan.pybackend/tests/e2e/notifications/test_notification_sse.pybackend/tests/e2e/services/sse/test_partitioned_event_router.pybackend/tests/unit/services/sse/test_redis_bus.pybackend/tests/unit/services/sse/test_sse_publish.pybackend/tests/unit/services/sse/test_sse_service.pydocs/architecture/lifecycle.mdfrontend/src/lib/admin/stores/__tests__/eventsStore.test.tsfrontend/src/lib/admin/stores/__tests__/executionsStore.test.tsfrontend/src/lib/admin/stores/__tests__/sagasStore.test.tsfrontend/src/lib/admin/stores/eventsStore.svelte.tsfrontend/src/lib/admin/stores/executionsStore.svelte.tsfrontend/src/lib/admin/stores/sagasStore.svelte.ts
💤 Files with no reviewable changes (2)
- backend/app/services/sse/redis_bus.py
- backend/tests/unit/services/sse/test_redis_bus.py
| """Poll execution's Redis Stream and await matching event. | ||
|
|
||
| The SSE bridge publishes all execution lifecycle events to | ||
| sse:exec:{execution_id}. Pure event-driven — no polling. | ||
| Reads from "0-0" on each iteration so late subscribers never miss events. | ||
| """ |
There was a problem hiding this comment.
Docstring behavior is stale vs implementation.
Line 42 says reads start from "0-0" on each iteration, but the loop advances last_id after each message. Please update the wording to match actual behavior.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/tests/e2e/conftest.py` around lines 40 - 43, Update the docstring to
reflect actual behavior: instead of saying it "Reads from '0-0' on each
iteration," state that the stream read starts from "0-0" on the first iteration
and then advances using the last_id variable after each message so subsequent
reads continue from the most recent event; reference the loop variable last_id
in the docstring to make behavior explicit.
Updated docstrings are not in Google style.
Please convert these docstrings to include structured sections (at least Args/Returns, and Raises where relevant).
As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections".
Also applies to: 91-97
| key = f"sse:notif:{user_id}" | ||
| async with asyncio.timeout(timeout): | ||
| async with redis_client.pubsub(ignore_subscribe_messages=True) as pubsub: | ||
| await pubsub.subscribe(channel) | ||
| async for _message in pubsub.listen(): | ||
| return # first message = notification persisted | ||
| while True: | ||
| if await _read_stream(redis_client, key): | ||
| return | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Examine the notification waiting logic
rg -n -C3 'async def wait_for_notification|_read_stream\(redis_client, key\)|create_execution_with_notification|wait_for_notification\(' backend/tests/e2e/conftest.pyRepository: HardMax71/Integr8sCode
Length of output: 925
🏁 Script executed:
#!/bin/bash
# Find and examine _read_stream function definition
rg -n -B2 -A10 'async def _read_stream' backend/tests/e2e/conftest.pyRepository: HardMax71/Integr8sCode
Length of output: 417
🏁 Script executed:
#!/bin/bash
# Check for stream cleanup/setup in the conftest file
rg -n 'asyncio.run\|redis_client.delete\|FLUSHDB\|xdel\|xtrim\|fixture\|setUp\|tearDown' backend/tests/e2e/conftest.py | head -30Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
#!/bin/bash
# Read the entire conftest.py to understand full context
wc -l backend/tests/e2e/conftest.pyRepository: HardMax71/Integr8sCode
Length of output: 99
🏁 Script executed:
#!/bin/bash
# Read the entire conftest.py file
cat -n backend/tests/e2e/conftest.pyRepository: HardMax71/Integr8sCode
Length of output: 8714
wait_for_notification uses stale stream entries instead of waiting for new ones.
The function reads from "0-0" (stream start) on every iteration, so any pre-existing entries in the stream satisfy the wait immediately. Compare with wait_for_sse_event (lines 33–54), which correctly tracks cursor position with last_id to avoid replay. This causes tests to pass incorrectly when the notification stream contains old entries.
Suggested fix (cursor-aware wait)
async def wait_for_notification(
redis_client: redis.Redis,
user_id: str,
*,
+ after: str = "0-0",
timeout: float = 30.0,
) -> None:
"""Wait for a notification on the user's Redis Stream.
The notification service publishes to sse:notif:{user_id} only after
persisting to MongoDB, so receiving a message is a correct readiness
signal — unlike RESULT_STORED which comes from an independent consumer
group with no ordering guarantee.
"""
key = f"sse:notif:{user_id}"
+ last_id = after
async with asyncio.timeout(timeout):
while True:
- if await _read_stream(redis_client, key):
+ batch = await _read_stream(redis_client, key, last_id)
+ if batch:
return
+ last_id = batch[-1][0] if batch else last_id
await asyncio.sleep(0.1) async def create_execution_with_notification(
client: AsyncClient,
redis_client: redis.Redis,
request: ExecutionRequest,
*,
timeout: float = 30.0,
) -> tuple[ExecutionResponse, NotificationResponse]:
"""Create execution, wait for notification delivery, return (execution, notification).
Fetches user_id from /auth/me, waits on sse:notif:{user_id} (correct
signal — notification service publishes after MongoDB persist), then
queries the notification list.
"""
me_resp = await client.get("/api/v1/auth/me")
assert me_resp.status_code == 200
user_id = me_resp.json()["user_id"]
+ key = f"sse:notif:{user_id}"
+ existing = await _read_stream(redis_client, key, "0-0")
+ after = existing[-1][0] if existing else "0-0"
+
execution = await create_execution(client, request)
- await wait_for_notification(redis_client, user_id, timeout=timeout)
+ await wait_for_notification(redis_client, user_id, after=after, timeout=timeout)
resp = await client.get("/api/v1/notifications", params={"limit": 10})
assert resp.status_code == 200
result = NotificationListResponse.model_validate(resp.json())
assert result.notifications, "No notification after SSE delivery"
return execution, result.notifications[0]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/tests/e2e/conftest.py` around lines 98 - 103, wait_for_notification
currently reads the stream from the start every loop and thus returns on stale
entries; change it to be cursor-aware like wait_for_sse_event by tracking a
last_id cursor (initialize it to the stream end or fetch the current last id
once), pass that cursor into _read_stream on each iteration so only entries with
id > last_id are considered, update last_id when new entries are returned and
return only when a new entry is observed; reference the wait_for_notification
function and the helper _read_stream to implement this cursor-based loop.
| afterEach(() => { | ||
| teardown?.(); | ||
| vi.clearAllTimers(); | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts | head -60Repository: HardMax71/Integr8sCode
Length of output: 2342
🏁 Script executed:
# Check if cleanup is imported anywhere in the file
rg "cleanup" frontend/src/lib/admin/stores/__tests__/sagasStore.test.tsRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check the beforeEach structure to see if vi.clearAllMocks() exists
rg "beforeEach|afterEach" -A 5 frontend/src/lib/admin/stores/__tests__/sagasStore.test.tsRepository: HardMax71/Integr8sCode
Length of output: 613
🏁 Script executed:
rg "render\(|@testing-library" frontend/src/lib/admin/stores/__tests__/sagasStore.test.tsRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Also check the full afterEach and surrounding context
wc -l frontend/src/lib/admin/stores/__tests__/sagasStore.test.tsRepository: HardMax71/Integr8sCode
Length of output: 128
🏁 Script executed:
# Get the complete file to understand test scope
cat frontend/src/lib/admin/stores/__tests__/sagasStore.test.tsRepository: HardMax71/Integr8sCode
Length of output: 8408
Add cleanup() in afterEach for Vitest test hygiene.
The afterEach hook is missing the cleanup() call required by the frontend test guideline. While this store test doesn't render components, cleanup() should still be called for consistency with the testing standards.
🧪 Suggested patch
+import { cleanup } from '@testing-library/svelte';
@@
afterEach(() => {
+ cleanup();
teardown?.();
vi.clearAllTimers();
});🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/lib/admin/stores/__tests__/sagasStore.test.ts` around lines 43 -
46, Update the afterEach teardown to call cleanup() for Vitest hygiene: inside
the existing afterEach that currently calls teardown?.() and
vi.clearAllTimers(), also invoke cleanup() (imported from the testing library)
to ensure DOM and test side-effects are fully reset; adjust the
teardown/afterEach block where teardown and vi.clearAllTimers() are referenced
so it calls cleanup() before/after teardown as appropriate.
|



Summary by cubic
Standardizes worker startup with a shared run_worker and replaces the Redis SSE bus with a stream-backed SSEService for simpler, more reliable fan-out. Cleans up event models, services, tests, and frontend stores to reduce duplication and improve stability.
Written for commit d024b8a. Summary will update on new commits.
Summary by CodeRabbit
New Features
Refactor
User-facing behavior
Documentation
Tests